This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 37c70c482a0 [FLINK-37024][task] Make cancel watchdog cover tasks stuck 
in DEPLOYING state (#25915)
37c70c482a0 is described below

commit 37c70c482a09845370cb6e694a26f55950c4699f
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Fri Jan 10 16:35:02 2025 +0800

    [FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYING 
state (#25915)
    
    * [FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYING 
state
---
 .../org/apache/flink/runtime/taskmanager/Task.java | 58 ++++++++++-------
 .../apache/flink/runtime/taskmanager/TaskTest.java | 74 ++++++++++++++++++++++
 2 files changed, 108 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ea91175c966..dff312e2ae4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1203,12 +1203,19 @@ public class Task
                 return;
             }
 
-            if (current == ExecutionState.DEPLOYING || current == 
ExecutionState.CREATED) {
+            if (current == ExecutionState.CREATED) {
                 if (transitionState(current, targetState, cause)) {
                     // if we manage this state transition, then the invokable 
gets never called
                     // we need not call cancel on it
                     return;
                 }
+            } else if (current == ExecutionState.DEPLOYING) {
+                if (transitionState(current, targetState, cause)) {
+                    // task may hang on the invokable constructor or static 
code
+                    // we need watchdog to ensure the task does not remain 
hanging
+                    startTaskCancellationWatchDog();
+                    return;
+                }
             } else if (current == ExecutionState.INITIALIZING
                     || current == ExecutionState.RUNNING) {
                 if (transitionState(current, targetState, cause)) {
@@ -1271,29 +1278,7 @@ public class Task
                                 FatalExitExceptionHandler.INSTANCE);
                         interruptingThread.start();
 
-                        // if a cancellation timeout is set, the watchdog 
thread kills the process
-                        // if graceful cancellation does not succeed
-                        if (taskCancellationTimeout > 0) {
-                            Runnable cancelWatchdog =
-                                    new TaskCancelerWatchDog(
-                                            taskInfo,
-                                            executingThread,
-                                            taskManagerActions,
-                                            taskCancellationTimeout,
-                                            jobId);
-
-                            Thread watchDogThread =
-                                    new Thread(
-                                            executingThread.getThreadGroup(),
-                                            cancelWatchdog,
-                                            String.format(
-                                                    "Cancellation Watchdog for 
%s (%s).",
-                                                    taskNameWithSubtask, 
executionId));
-                            watchDogThread.setDaemon(true);
-                            watchDogThread.setUncaughtExceptionHandler(
-                                    FatalExitExceptionHandler.INSTANCE);
-                            watchDogThread.start();
-                        }
+                        startTaskCancellationWatchDog();
                     }
                     return;
                 }
@@ -1306,6 +1291,31 @@ public class Task
         }
     }
 
+    private void startTaskCancellationWatchDog() {
+        // if a cancellation timeout is set, the watchdog thread kills the 
process
+        // if graceful cancellation does not succeed
+        if (taskCancellationTimeout > 0) {
+            Runnable cancelWatchdog =
+                    new TaskCancelerWatchDog(
+                            taskInfo,
+                            executingThread,
+                            taskManagerActions,
+                            taskCancellationTimeout,
+                            jobId);
+
+            Thread watchDogThread =
+                    new Thread(
+                            executingThread.getThreadGroup(),
+                            cancelWatchdog,
+                            String.format(
+                                    "Cancellation Watchdog for %s (%s).",
+                                    taskNameWithSubtask, executionId));
+            watchDogThread.setDaemon(true);
+            
watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+            watchDogThread.start();
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Partition State Listeners
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 45dfcd32c0d..fb3058087a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -935,6 +935,36 @@ public class TaskTest extends TestLogger {
         task.getExecutingThread().join();
     }
 
+    /**
+     * Tests that interrupt happens via watch dog if canceller is stuck in 
cancel. Task cancellation
+     * blocks the task canceller. Interrupt after cancel via cancellation 
watch dog.
+     */
+    @Test
+    public void testWatchDogThrowFatalErrorOnTaskStuckInInstantiation() throws 
Exception {
+        final InterruptOnFatalErrorTaskManagerActions taskManagerActions =
+                new InterruptOnFatalErrorTaskManagerActions();
+
+        final Configuration config = new Configuration();
+        config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 
Duration.ofMillis(5));
+        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 
Duration.ofMillis(1000L));
+
+        final Task task =
+                createTaskBuilder()
+                        .setInvokable(InvokableBlockingInInstantiation.class)
+                        .setTaskManagerConfig(config)
+                        .setTaskManagerActions(taskManagerActions)
+                        .build(Executors.directExecutor());
+        taskManagerActions.setExecutingThread(task.getExecutingThread());
+
+        task.startTaskThread();
+        InvokableBlockingInInstantiation.await();
+        task.cancelExecution();
+        task.getExecutingThread().join();
+
+        // Expect fatal error to recover
+        assertTrue(taskManagerActions.hasFatalError());
+    }
+
     /**
      * The 'invoke' method holds a lock (trigger awaitLatch after acquisition) 
and cancel cannot
      * complete because it also tries to acquire the same lock. This is 
resolved by the watch dog,
@@ -1284,6 +1314,26 @@ public class TaskTest extends TestLogger {
         }
     }
 
+    /** Customized TaskManagerActions that interrupts task thread on fatal 
error. */
+    private static class InterruptOnFatalErrorTaskManagerActions extends 
NoOpTaskManagerActions {
+        private boolean fatalError = false;
+        private Thread executingThread;
+
+        @Override
+        public void notifyFatalError(String message, Throwable cause) {
+            fatalError = true;
+            executingThread.interrupt();
+        }
+
+        public boolean hasFatalError() {
+            return fatalError;
+        }
+
+        public void setExecutingThread(Thread executingThread) {
+            this.executingThread = executingThread;
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  helper functions
     // ------------------------------------------------------------------------
@@ -1571,6 +1621,30 @@ public class TaskTest extends TestLogger {
         public void cancel() {}
     }
 
+    /** {@link AbstractInvokable} which blocks in instantiation. */
+    public static final class InvokableBlockingInInstantiation extends 
AbstractInvokable {
+        /** Declared static, otherwise there's no way to access it when 
blocking in constructor. */
+        static final OneShotLatch AWAIT_LATCH = new OneShotLatch();
+
+        public InvokableBlockingInInstantiation(Environment environment)
+                throws InterruptedException {
+            super(environment);
+            while (true) {
+                synchronized (this) {
+                    AWAIT_LATCH.trigger();
+                    wait();
+                }
+            }
+        }
+
+        @Override
+        public void invoke() {}
+
+        static void await() throws InterruptedException {
+            AWAIT_LATCH.await();
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  test exceptions
     // ------------------------------------------------------------------------

Reply via email to