[ 
https://issues.apache.org/jira/browse/FLINK-39085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Zhang updated FLINK-39085:
-----------------------------
    Description: 
In HA mode, when a JM failure occurs, the TM disconnects from the JM and fails 
all tasks running on it—similar to the task cancellation process. However, 
during this phase, certain tasks may get stuck in cancellation. If the JM 
recovers quickly and begins redeploying tasks, a problematic scenario can 
arise: a single slot on the TM might end up hosting two instances of the same 
logical task—one from the pre-failover execution and one newly deployed after 
JM recovery.

Notably, after JM failover, the {{executionGraphId}} changes, which results in 
different {{{}ExecutionAttemptID{}}}s for the old and new task instances. 
Because the current implementation of {{TaskSlotTable.addTask()}} only checks 
for conflicts based on {{{}ExecutionAttemptID{}}}, it fails to recognize that 
these two tasks represent the same logical entity. Consequently, the new task 
is successfully deployed and starts running alongside the lingering old 
instance within the same slot.

This raises a question: Is this behavior acceptable? Should the TaskExecutor 
instead reject the deployment of a new task if an older instance of the same 
logical task is still present in the slot—even if their 
{{{}ExecutionAttemptID{}}}s differ? Or, more aggressively, should the TM 
trigger a fatal error to prevent potential inconsistency, resource contention, 
or correctness issues arising from such duplication?

—

A Simple Reproduction Example
1. Setup: Modify the SocketWindowWordCount example by adding an uninterruptible 
sleep in the FlatMap to simulate a task that hangs during cancellation.

2. Initial run: Start ZooKeeper, a standalone Flink cluster and one 
TaskManager, then submit the modified job. The TM correctly deploys two tasks 
(as shown in tasks1.png in the attachment). Metrics (e.g., 
backPressuredTimeMsPerSecond) from a metric reporter also show these tasks.
{code:java}
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:_Socket_Stream____Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows____Sink:_Print_to_Std__Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0{code}
!tasks1.png|width=808,height=60!

3. JM failover: Kill and restart the JM to mock failover. After recovery, the 
TM ends up running three tasks (as shown in tasks2.png in the attachment)—one 
lingering instance from before the failover (whose cancellation was stuck) plus 
two newly deployed tasks. Metric reporter output shows three distinct tasks, 
confirming the duplication.
{code:java}
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:_Socket_Stream____Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:_Socket_Stream____Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows____Sink:_Print_to_Std__Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0{code}
!tasks2.png|width=800,height=62!

  was:
In HA mode, when a JM failure occurs, the TM disconnects from the JM and fails 
all tasks running on it—similar to the task cancellation process. However, 
during this phase, certain tasks may get stuck in cancellation. If the JM 
recovers quickly and begins redeploying tasks, a problematic scenario can 
arise: a single slot on the TM might end up hosting two instances of the same 
logical task—one from the pre-failover execution and one newly deployed after 
JM recovery.

Notably, after JM failover, the {{executionGraphId}} changes, which results in 
different {{{}ExecutionAttemptID{}}}s for the old and new task instances. 
Because the current implementation of {{TaskSlotTable.addTask()}} only checks 
for conflicts based on {{{}ExecutionAttemptID{}}}, it fails to recognize that 
these two tasks represent the same logical entity. Consequently, the new task 
is successfully deployed and starts running alongside the lingering old 
instance within the same slot.

This raises a question: Is this behavior acceptable? Should the TaskExecutor 
instead reject the deployment of a new task if an older instance of the same 
logical task is still present in the slot—even if their 
{{{}ExecutionAttemptID{}}}s differ? Or, more aggressively, should the TM 
trigger a fatal error to prevent potential inconsistency, resource contention, 
or correctness issues arising from such duplication?

—

A Simple Reproduction Example
1. Setup: Modify the SocketWindowWordCount example by adding an uninterruptible 
sleep in the FlatMap to simulate a task that hangs during cancellation.

2. Initial run: Start ZooKeeper, a standalone Flink cluster and one 
TaskManager, then submit the modified job. The TM correctly deploys two tasks 
(as shown in tasks1.png in the attachment). Metrics (e.g., 
backPressuredTimeMsPerSecond) from a metric reporter also show these tasks.
{code:java}
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:Socket_Stream___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows_{}{}Sink:Print_to_Std{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0 {code}
!tasks1.png|width=808,height=60!

3. JM failover: Kill and restart the JM to mock failover. After recovery, the 
TM ends up running three tasks (as shown in tasks2.png in the attachment)—one 
lingering instance from before the failover (whose cancellation was stuck) plus 
two newly deployed tasks. Metric reporter output shows three distinct tasks, 
confirming the duplication.
{code:java}
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:Socket_Stream___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:Socket_Stream___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows_{}{}Sink:Print_to_Std{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
 0.0{code}
!tasks2.png|width=800,height=62!


> Potential task duplication in the same TM slot after JM failover
> ----------------------------------------------------------------
>
>                 Key: FLINK-39085
>                 URL: https://issues.apache.org/jira/browse/FLINK-39085
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Yi Zhang
>            Priority: Major
>         Attachments: tasks1.png, tasks2.png
>
>
> In HA mode, when a JM failure occurs, the TM disconnects from the JM and 
> fails all tasks running on it—similar to the task cancellation process. 
> However, during this phase, certain tasks may get stuck in cancellation. If 
> the JM recovers quickly and begins redeploying tasks, a problematic scenario 
> can arise: a single slot on the TM might end up hosting two instances of the 
> same logical task—one from the pre-failover execution and one newly deployed 
> after JM recovery.
> Notably, after JM failover, the {{executionGraphId}} changes, which results 
> in different {{{}ExecutionAttemptID{}}}s for the old and new task instances. 
> Because the current implementation of {{TaskSlotTable.addTask()}} only checks 
> for conflicts based on {{{}ExecutionAttemptID{}}}, it fails to recognize that 
> these two tasks represent the same logical entity. Consequently, the new task 
> is successfully deployed and starts running alongside the lingering old 
> instance within the same slot.
> This raises a question: Is this behavior acceptable? Should the TaskExecutor 
> instead reject the deployment of a new task if an older instance of the same 
> logical task is still present in the slot—even if their 
> {{{}ExecutionAttemptID{}}}s differ? Or, more aggressively, should the TM 
> trigger a fatal error to prevent potential inconsistency, resource 
> contention, or correctness issues arising from such duplication?
> —
> A Simple Reproduction Example
> 1. Setup: Modify the SocketWindowWordCount example by adding an 
> uninterruptible sleep in the FlatMap to simulate a task that hangs during 
> cancellation.
> 2. Initial run: Start ZooKeeper, a standalone Flink cluster and one 
> TaskManager, then submit the modified job. The TM correctly deploys two tasks 
> (as shown in tasks1.png in the attachment). Metrics (e.g., 
> backPressuredTimeMsPerSecond) from a metric reporter also show these tasks.
> {code:java}
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:_Socket_Stream____Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
>  0.0
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows____Sink:_Print_to_Std__Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
>  0.0{code}
> !tasks1.png|width=808,height=60!
> 3. JM failover: Kill and restart the JM to mock failover. After recovery, the 
> TM ends up running three tasks (as shown in tasks2.png in the attachment)—one 
> lingering instance from before the failover (whose cancellation was stuck) 
> plus two newly deployed tasks. Metric reporter output shows three distinct 
> tasks, confirming the duplication.
> {code:java}
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:_Socket_Stream____Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
>  0.0
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:_Socket_Stream____Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
>  0.0
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows____Sink:_Print_to_Std__Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
>  0.0{code}
> !tasks2.png|width=800,height=62!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to