[ 
https://issues.apache.org/jira/browse/FLINK-34261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

junzhong qin closed FLINK-34261.
--------------------------------
    Resolution: Fixed

Close this issue because TaskExecutorManager has been removed from the master 
branch.

> When slotmanager.redundant-taskmanager-num is set to a value greater than 1, 
> redundant task managers may be repeatedly released and requested
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34261
>                 URL: https://issues.apache.org/jira/browse/FLINK-34261
>             Project: Flink
>          Issue Type: Bug
>            Reporter: junzhong qin
>            Assignee: junzhong qin
>            Priority: Not a Priority
>         Attachments: image-2024-01-29-17-29-15-453.png
>
>
> Redundant task managers are extra task managers started by Flink, to speed up 
> job recovery in case of failures due to task manager lost. But when we 
> configured 
> {code:java}
> slotmanager.redundant-taskmanager-num: 2 // any value greater than 1{code}
> Flink will release and request redundant TM repeatedly.
> h2. Root cause
>  # When `slotmanager.redundant-taskmanager-num` is set to a value greater 
> than 1 and one redundant Task Manager is registered, the 
> `checkResourceRequirementsWithDelay()` function is called.
>  # Within `checkResourceRequirementsWithDelay()`, if the `missingResources` 
> list is empty, the `taskExecutorManager` will invoke 
> `clearPendingTaskManagerSlots()` to clear the pending slots.
>  # This may result in the release of other redundant Task Managers. In such 
> cases, the `TaskExecutorManager` will request new redundant Task Managers 
> when the existing ones are insufficient.
> h2. Reproduce
> We can reproduce this situation by using [Flink Kubernetes Operator (using 
> minikube 
> here)|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/],
>  here is an example yaml file:
>  
> {code:java}
> // redundant-tm.yaml
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################apiVersion:
>  flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: redundant-tm
> spec:
>   image: flink:1.18
>   flinkVersion: v1_18
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: "2"
>     slotmanager.redundant-taskmanager-num: "2"
>     cluster.fine-grained-resource-management.enabled: "false"
>   serviceAccount: flink
>   jobManager:
>     resource:
>       memory: "1024m"
>       cpu: 1
>   taskManager:
>     resource:
>       memory: "1024m"
>       cpu: 1
>   job:
>     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
>     parallelism: 3
>     upgradeMode: stateless
>   logConfiguration:
>     log4j-console.properties: |+
>       rootLogger.level = DEBUG
>       rootLogger.appenderRef.file.ref = LogFile
>       rootLogger.appenderRef.console.ref = LogConsole
>       appender.file.name = LogFile
>       appender.file.type = File
>       appender.file.append = false
>       appender.file.fileName = ${sys:log.file}
>       appender.file.layout.type = PatternLayout
>       appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c 
> %x - %m%n{code}
> After executing:
> {code:java}
> kubectl create -f redundant-tm.yaml
> kubectl port-forward svc/redundant-tm 8081{code}
> We can find repeatedly release and request redundant TM in JM's log:
> {code:java}
> // release
> 2024-01-29 09:26:25,033 INFO 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Registering TaskManager with ResourceID redundant-tm-taskmanager-1-4 
> (pekko.tcp://[email protected]:6122/user/rpc/taskmanager_0) at 
> ResourceManager
> 2024-01-29 09:26:25,060 DEBUG 
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Registering task executor redundant-tm-taskmanager-1-4 under 
> 44c649b2d84e87cdd5e6c53971f8b877 at the slot manager.
> 2024-01-29 09:26:25,061 INFO 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker redundant-tm-taskmanager-1-4 is registered.
> 2024-01-29 09:26:25,061 INFO 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker redundant-tm-taskmanager-1-4 with resource spec WorkerResourceSpec 
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb 
> (241591914 bytes), numSlots=2} was requested in current attempt. Current 
> pending count after registering: 1.
> 2024-01-29 09:26:25,196 DEBUG 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec 
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb 
> (241591914 bytes), numSlots=2}, numNeeded=3, unwantedWorkers=[]}].
> 2024-01-29 09:26:25,196 INFO 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> need release 1 workers, current worker number 4, declared worker number 3
> 2024-01-29 09:26:25,199 INFO 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Stopping worker redundant-tm-taskmanager-1-3.
> 2024-01-29 09:26:25,199 INFO 
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Stopping 
> TaskManager pod redundant-tm-taskmanager-1-3.
> 2024-01-29 09:26:25,203 INFO 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker redundant-tm-taskmanager-1-3 with resource spec WorkerResourceSpec 
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb 
> (241591914 bytes), numSlots=2} was requested in current attempt and has not 
> registered. Current pending count after removing: 0. 
> ......
> // request
> 2024-01-29 09:26:31,622 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 12 as completed for source Source: Events Generator 
> Source.2024-01-29 09:26:32,615 DEBUG 
> org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager [] - 
> Allocating 1 task executors for redundancy.2024-01-29 09:26:32,685 DEBUG 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec 
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb 
> (241591914 bytes), numSlots=2}, numNeeded=4, unwantedWorkers=[]}].2024-01-29 
> 09:26:32,685 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> need request 1 new workers, current worker number 3, declared worker number 
> 42024-01-29 09:26:32,685 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=1.0, 
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, 
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 
> bytes), numSlots=2}, current pending count: 1. {code}
> !image-2024-01-29-17-29-15-453.png!
> The job has a parallelism of 3, which requires 4 Task Managers (2 for task 
> execution and 2 for redundancy). However, we observed that the Task Manager 
> "redundant-tm-taskmanager-1-3" was initially requested but later released.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to