junzhong qin created FLINK-34261: ------------------------------------ Summary: When slotmanager.redundant-taskmanager-num is set to a value greater than 1, redundant task managers may be repeatedly released and requested Key: FLINK-34261 URL: https://issues.apache.org/jira/browse/FLINK-34261 Project: Flink Issue Type: Bug Reporter: junzhong qin Attachments: image-2024-01-29-17-29-15-453.png
Redundant task managers are extra task managers started by Flink, to speed up job recovery in case of failures due to task manager lost. But when we configured {code:java} slotmanager.redundant-taskmanager-num: 2 // any value greater than 1{code} Flink will release and request redundant TM repeatedly. We can reproduce this situation by using [Flink Kubernetes Operator (using minikube here)|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/], here is an example yaml file: {code:java} // redundant-tm.yaml ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: redundant-tm spec: image: flink:1.18 flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" slotmanager.redundant-taskmanager-num: "2" cluster.fine-grained-resource-management.enabled: "false" serviceAccount: flink jobManager: resource: memory: "1024m" cpu: 1 taskManager: resource: memory: "1024m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 3 upgradeMode: stateless logConfiguration: log4j-console.properties: |+ rootLogger.level = DEBUG rootLogger.appenderRef.file.ref = LogFile rootLogger.appenderRef.console.ref = LogConsole appender.file.name = LogFile appender.file.type = File appender.file.append = false appender.file.fileName = ${sys:log.file} appender.file.layout.type = PatternLayout appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n{code} After executing: {code:java} kubectl create -f redundant-tm.yaml kubectl port-forward svc/redundant-tm 8081{code} We can find repeatedly release and request redundant TM in JM's log: {code:java} 2024-01-29 09:26:25,033 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Registering TaskManager with ResourceID redundant-tm-taskmanager-1-4 (pekko.tcp://flink@10.244.1.196:6122/user/rpc/taskmanager_0) at ResourceManager2024-01-29 09:26:25,060 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Registering task executor redundant-tm-taskmanager-1-4 under 44c649b2d84e87cdd5e6c53971f8b877 at the slot manager.2024-01-29 09:26:25,061 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker redundant-tm-taskmanager-1-4 is registered.2024-01-29 09:26:25,061 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker redundant-tm-taskmanager-1-4 with resource spec WorkerResourceSpec {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes), numSlots=2} was requested in current attempt. Current pending count after registering: 1.2024-01-29 09:26:25,196 DEBUG org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes), numSlots=2}, numNeeded=3, unwantedWorkers=[]}].2024-01-29 09:26:25,196 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - need release 1 workers, current worker number 4, declared worker number 32024-01-29 09:26:25,199 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Stopping worker redundant-tm-taskmanager-1-3.2024-01-29 09:26:25,199 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Stopping TaskManager pod redundant-tm-taskmanager-1-3.2024-01-29 09:26:25,203 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker redundant-tm-taskmanager-1-3 with resource spec WorkerResourceSpec {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes), numSlots=2} was requested in current attempt and has not registered. Current pending count after removing: 0.2024-01-29 09:26:25,203 DEBUG org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - No open TaskExecutor connection redundant-tm-taskmanager-1-3. Ignoring close TaskExecutor connection. Closing reason was: resource is no longer needed 2024-01-29 09:26:31,622 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 12 as completed for source Source: Events Generator Source.2024-01-29 09:26:32,615 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager [] - Allocating 1 task executors for redundancy.2024-01-29 09:26:32,685 DEBUG org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes), numSlots=2}, numNeeded=4, unwantedWorkers=[]}].2024-01-29 09:26:32,685 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - need request 1 new workers, current worker number 3, declared worker number 42024-01-29 09:26:32,685 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes), numSlots=2}, current pending count: 1. {code} !image-2024-01-29-17-29-15-453.png! The job has a parallelism of 3, which requires 4 Task Managers (2 for task execution and 2 for redundancy). However, we observed that the Task Manager "redundant-tm-taskmanager-1-3" was initially requested but later released. -- This message was sent by Atlassian Jira (v8.20.10#820010)