junzhong qin created FLINK-34261:
------------------------------------

             Summary: When slotmanager.redundant-taskmanager-num is set to a 
value greater than 1, redundant task managers may be repeatedly released and 
requested
                 Key: FLINK-34261
                 URL: https://issues.apache.org/jira/browse/FLINK-34261
             Project: Flink
          Issue Type: Bug
            Reporter: junzhong qin
         Attachments: image-2024-01-29-17-29-15-453.png

Redundant task managers are extra task managers started by Flink, to speed up 
job recovery in case of failures due to task manager lost. But when we 
configured 
{code:java}
slotmanager.redundant-taskmanager-num: 2 // any value greater than 1{code}
Flink will release and request redundant TM repeatedly.

We can reproduce this situation by using [Flink Kubernetes Operator (using 
minikube 
here)|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/], 
here is an example yaml file:

 
{code:java}
// redundant-tm.yaml
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################apiVersion:
 flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: redundant-tm
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    slotmanager.redundant-taskmanager-num: "2"
    cluster.fine-grained-resource-management.enabled: "false"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "1024m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 3
    upgradeMode: stateless
  logConfiguration:
    log4j-console.properties: |+
      rootLogger.level = DEBUG
      rootLogger.appenderRef.file.ref = LogFile
      rootLogger.appenderRef.console.ref = LogConsole
      appender.file.name = LogFile
      appender.file.type = File
      appender.file.append = false
      appender.file.fileName = ${sys:log.file}
      appender.file.layout.type = PatternLayout
      appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n{code}
After executing:
{code:java}
kubectl create -f redundant-tm.yaml
kubectl port-forward svc/redundant-tm 8081{code}
We can find repeatedly release and request redundant TM in JM's log:
{code:java}
2024-01-29 09:26:25,033 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Registering TaskManager with ResourceID redundant-tm-taskmanager-1-4 
(pekko.tcp://flink@10.244.1.196:6122/user/rpc/taskmanager_0) at 
ResourceManager2024-01-29 09:26:25,060 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Registering task executor redundant-tm-taskmanager-1-4 under 
44c649b2d84e87cdd5e6c53971f8b877 at the slot manager.2024-01-29 09:26:25,061 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- Worker redundant-tm-taskmanager-1-4 is registered.2024-01-29 09:26:25,061 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- Worker redundant-tm-taskmanager-1-4 with resource spec WorkerResourceSpec 
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
bytes), numSlots=2} was requested in current attempt. Current pending count 
after registering: 1.2024-01-29 09:26:25,196 DEBUG 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec 
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
bytes), numSlots=2}, numNeeded=3, unwantedWorkers=[]}].2024-01-29 09:26:25,196 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- need release 1 workers, current worker number 4, declared worker number 
32024-01-29 09:26:25,199 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Stopping worker redundant-tm-taskmanager-1-3.2024-01-29 09:26:25,199 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Stopping 
TaskManager pod redundant-tm-taskmanager-1-3.2024-01-29 09:26:25,203 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker redundant-tm-taskmanager-1-3 with resource spec WorkerResourceSpec 
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
bytes), numSlots=2} was requested in current attempt and has not registered. 
Current pending count after removing: 0.2024-01-29 09:26:25,203 DEBUG 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - No 
open TaskExecutor connection redundant-tm-taskmanager-1-3. Ignoring close 
TaskExecutor connection. Closing reason was: resource is no longer needed


2024-01-29 09:26:31,622 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 12 as completed for source Source: Events Generator 
Source.2024-01-29 09:26:32,615 DEBUG 
org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager [] - 
Allocating 1 task executors for redundancy.2024-01-29 09:26:32,685 DEBUG 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec 
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
bytes), numSlots=2}, numNeeded=4, unwantedWorkers=[]}].2024-01-29 09:26:32,685 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- need request 1 new workers, current worker number 3, declared worker number 
42024-01-29 09:26:32,685 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=1.0, 
taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
bytes), numSlots=2}, current pending count: 1. {code}
!image-2024-01-29-17-29-15-453.png!

The job has a parallelism of 3, which requires 4 Task Managers (2 for task 
execution and 2 for redundancy). However, we observed that the Task Manager 
"redundant-tm-taskmanager-1-3" was initially requested but later released.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to