[
https://issues.apache.org/jira/browse/FLINK-34261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
junzhong qin updated FLINK-34261:
---------------------------------
Description:
Redundant task managers are extra task managers started by Flink, to speed up
job recovery in case of failures due to task manager lost. But when we
configured
{code:java}
slotmanager.redundant-taskmanager-num: 2 // any value greater than 1{code}
Flink will release and request redundant TM repeatedly.
We can reproduce this situation by using [Flink Kubernetes Operator (using
minikube
here)|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/],
here is an example yaml file:
{code:java}
// redundant-tm.yaml
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################apiVersion:
flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: redundant-tm
spec:
image: flink:1.18
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
slotmanager.redundant-taskmanager-num: "2"
cluster.fine-grained-resource-management.enabled: "false"
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 3
upgradeMode: stateless
logConfiguration:
log4j-console.properties: |+
rootLogger.level = DEBUG
rootLogger.appenderRef.file.ref = LogFile
rootLogger.appenderRef.console.ref = LogConsole
appender.file.name = LogFile
appender.file.type = File
appender.file.append = false
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n{code}
After executing:
{code:java}
kubectl create -f redundant-tm.yaml
kubectl port-forward svc/redundant-tm 8081{code}
We can find repeatedly release and request redundant TM in JM's log:
{code:java}
// release
2024-01-29 09:26:25,033 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Registering TaskManager with ResourceID redundant-tm-taskmanager-1-4
(pekko.tcp://[email protected]:6122/user/rpc/taskmanager_0) at ResourceManager
2024-01-29 09:26:25,060 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Registering task executor redundant-tm-taskmanager-1-4 under
44c649b2d84e87cdd5e6c53971f8b877 at the slot manager.
2024-01-29 09:26:25,061 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Worker redundant-tm-taskmanager-1-4 is registered.
2024-01-29 09:26:25,061 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Worker redundant-tm-taskmanager-1-4 with resource spec WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2} was requested in current attempt. Current pending count
after registering: 1.
2024-01-29 09:26:25,196 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2}, numNeeded=3, unwantedWorkers=[]}].
2024-01-29 09:26:25,196 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - need
release 1 workers, current worker number 4, declared worker number 3
2024-01-29 09:26:25,199 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Stopping worker redundant-tm-taskmanager-1-3.
2024-01-29 09:26:25,199 INFO
org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Stopping
TaskManager pod redundant-tm-taskmanager-1-3.
2024-01-29 09:26:25,203 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Worker redundant-tm-taskmanager-1-3 with resource spec WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2} was requested in current attempt and has not registered.
Current pending count after removing: 0.
// request
2024-01-29 09:26:31,622 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 12 as completed for source Source: Events Generator
Source.2024-01-29 09:26:32,615 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager [] -
Allocating 1 task executors for redundancy.2024-01-29 09:26:32,685 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2}, numNeeded=4, unwantedWorkers=[]}].2024-01-29 09:26:32,685
INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager []
- need request 1 new workers, current worker number 3, declared worker number
42024-01-29 09:26:32,685 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=1.0,
taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2}, current pending count: 1. {code}
!image-2024-01-29-17-29-15-453.png!
The job has a parallelism of 3, which requires 4 Task Managers (2 for task
execution and 2 for redundancy). However, we observed that the Task Manager
"redundant-tm-taskmanager-1-3" was initially requested but later released.
was:
Redundant task managers are extra task managers started by Flink, to speed up
job recovery in case of failures due to task manager lost. But when we
configured
{code:java}
slotmanager.redundant-taskmanager-num: 2 // any value greater than 1{code}
Flink will release and request redundant TM repeatedly.
We can reproduce this situation by using [Flink Kubernetes Operator (using
minikube
here)|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/],
here is an example yaml file:
{code:java}
// redundant-tm.yaml
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################apiVersion:
flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: redundant-tm
spec:
image: flink:1.18
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
slotmanager.redundant-taskmanager-num: "2"
cluster.fine-grained-resource-management.enabled: "false"
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 3
upgradeMode: stateless
logConfiguration:
log4j-console.properties: |+
rootLogger.level = DEBUG
rootLogger.appenderRef.file.ref = LogFile
rootLogger.appenderRef.console.ref = LogConsole
appender.file.name = LogFile
appender.file.type = File
appender.file.append = false
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n{code}
After executing:
{code:java}
kubectl create -f redundant-tm.yaml
kubectl port-forward svc/redundant-tm 8081{code}
We can find repeatedly release and request redundant TM in JM's log:
{code:java}
2024-01-29 09:26:25,033 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Registering TaskManager with ResourceID redundant-tm-taskmanager-1-4
(pekko.tcp://[email protected]:6122/user/rpc/taskmanager_0) at
ResourceManager2024-01-29 09:26:25,060 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Registering task executor redundant-tm-taskmanager-1-4 under
44c649b2d84e87cdd5e6c53971f8b877 at the slot manager.2024-01-29 09:26:25,061
INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager []
- Worker redundant-tm-taskmanager-1-4 is registered.2024-01-29 09:26:25,061
INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager []
- Worker redundant-tm-taskmanager-1-4 with resource spec WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2} was requested in current attempt. Current pending count
after registering: 1.2024-01-29 09:26:25,196 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2}, numNeeded=3, unwantedWorkers=[]}].2024-01-29 09:26:25,196
INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager []
- need release 1 workers, current worker number 4, declared worker number
32024-01-29 09:26:25,199 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Stopping worker redundant-tm-taskmanager-1-3.2024-01-29 09:26:25,199 INFO
org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Stopping
TaskManager pod redundant-tm-taskmanager-1-3.2024-01-29 09:26:25,203 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Worker redundant-tm-taskmanager-1-3 with resource spec WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2} was requested in current attempt and has not registered.
Current pending count after removing: 0.2024-01-29 09:26:25,203 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - No
open TaskExecutor connection redundant-tm-taskmanager-1-3. Ignoring close
TaskExecutor connection. Closing reason was: resource is no longer needed
2024-01-29 09:26:31,622 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 12 as completed for source Source: Events Generator
Source.2024-01-29 09:26:32,615 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager [] -
Allocating 1 task executors for redundancy.2024-01-29 09:26:32,685 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec
{cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2}, numNeeded=4, unwantedWorkers=[]}].2024-01-29 09:26:32,685
INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager []
- need request 1 new workers, current worker number 3, declared worker number
42024-01-29 09:26:32,685 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=1.0,
taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
bytes), numSlots=2}, current pending count: 1. {code}
!image-2024-01-29-17-29-15-453.png!
The job has a parallelism of 3, which requires 4 Task Managers (2 for task
execution and 2 for redundancy). However, we observed that the Task Manager
"redundant-tm-taskmanager-1-3" was initially requested but later released.
> When slotmanager.redundant-taskmanager-num is set to a value greater than 1,
> redundant task managers may be repeatedly released and requested
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34261
> URL: https://issues.apache.org/jira/browse/FLINK-34261
> Project: Flink
> Issue Type: Bug
> Reporter: junzhong qin
> Priority: Not a Priority
> Attachments: image-2024-01-29-17-29-15-453.png
>
>
> Redundant task managers are extra task managers started by Flink, to speed up
> job recovery in case of failures due to task manager lost. But when we
> configured
> {code:java}
> slotmanager.redundant-taskmanager-num: 2 // any value greater than 1{code}
> Flink will release and request redundant TM repeatedly.
> We can reproduce this situation by using [Flink Kubernetes Operator (using
> minikube
> here)|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/],
> here is an example yaml file:
>
> {code:java}
> // redundant-tm.yaml
> ################################################################################
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################apiVersion:
> flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
> name: redundant-tm
> spec:
> image: flink:1.18
> flinkVersion: v1_18
> flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> slotmanager.redundant-taskmanager-num: "2"
> cluster.fine-grained-resource-management.enabled: "false"
> serviceAccount: flink
> jobManager:
> resource:
> memory: "1024m"
> cpu: 1
> taskManager:
> resource:
> memory: "1024m"
> cpu: 1
> job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 3
> upgradeMode: stateless
> logConfiguration:
> log4j-console.properties: |+
> rootLogger.level = DEBUG
> rootLogger.appenderRef.file.ref = LogFile
> rootLogger.appenderRef.console.ref = LogConsole
> appender.file.name = LogFile
> appender.file.type = File
> appender.file.append = false
> appender.file.fileName = ${sys:log.file}
> appender.file.layout.type = PatternLayout
> appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c
> %x - %m%n{code}
> After executing:
> {code:java}
> kubectl create -f redundant-tm.yaml
> kubectl port-forward svc/redundant-tm 8081{code}
> We can find repeatedly release and request redundant TM in JM's log:
> {code:java}
> // release
> 2024-01-29 09:26:25,033 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registering TaskManager with ResourceID redundant-tm-taskmanager-1-4
> (pekko.tcp://[email protected]:6122/user/rpc/taskmanager_0) at
> ResourceManager
> 2024-01-29 09:26:25,060 DEBUG
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Registering task executor redundant-tm-taskmanager-1-4 under
> 44c649b2d84e87cdd5e6c53971f8b877 at the slot manager.
> 2024-01-29 09:26:25,061 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker redundant-tm-taskmanager-1-4 is registered.
> 2024-01-29 09:26:25,061 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker redundant-tm-taskmanager-1-4 with resource spec WorkerResourceSpec
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes), numSlots=2} was requested in current attempt. Current
> pending count after registering: 1.
> 2024-01-29 09:26:25,196 DEBUG
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes), numSlots=2}, numNeeded=3, unwantedWorkers=[]}].
> 2024-01-29 09:26:25,196 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> need release 1 workers, current worker number 4, declared worker number 3
> 2024-01-29 09:26:25,199 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Stopping worker redundant-tm-taskmanager-1-3.
> 2024-01-29 09:26:25,199 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Stopping
> TaskManager pod redundant-tm-taskmanager-1-3.
> 2024-01-29 09:26:25,203 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker redundant-tm-taskmanager-1-3 with resource spec WorkerResourceSpec
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes), numSlots=2} was requested in current attempt and has not
> registered. Current pending count after removing: 0.
> // request
> 2024-01-29 09:26:31,622 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
> checkpoint 12 as completed for source Source: Events Generator
> Source.2024-01-29 09:26:32,615 DEBUG
> org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager [] -
> Allocating 1 task executors for redundancy.2024-01-29 09:26:32,685 DEBUG
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Update resource declarations to [ResourceDeclaration{spec=WorkerResourceSpec
> {cpuCores=1.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0
> bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes), numSlots=2}, numNeeded=4, unwantedWorkers=[]}].2024-01-29
> 09:26:32,685 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> need request 1 new workers, current worker number 3, declared worker number
> 42024-01-29 09:26:32,685 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914
> bytes), numSlots=2}, current pending count: 1. {code}
> !image-2024-01-29-17-29-15-453.png!
> The job has a parallelism of 3, which requires 4 Task Managers (2 for task
> execution and 2 for redundancy). However, we observed that the Task Manager
> "redundant-tm-taskmanager-1-3" was initially requested but later released.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)