This is an automated email from the ASF dual-hosted git repository. huweihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 37c70c482a0 [FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYING state (#25915) 37c70c482a0 is described below commit 37c70c482a09845370cb6e694a26f55950c4699f Author: Zhanghao Chen <m...@outlook.com> AuthorDate: Fri Jan 10 16:35:02 2025 +0800 [FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYING state (#25915) * [FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYING state --- .../org/apache/flink/runtime/taskmanager/Task.java | 58 ++++++++++------- .../apache/flink/runtime/taskmanager/TaskTest.java | 74 ++++++++++++++++++++++ 2 files changed, 108 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index ea91175c966..dff312e2ae4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -1203,12 +1203,19 @@ public class Task return; } - if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) { + if (current == ExecutionState.CREATED) { if (transitionState(current, targetState, cause)) { // if we manage this state transition, then the invokable gets never called // we need not call cancel on it return; } + } else if (current == ExecutionState.DEPLOYING) { + if (transitionState(current, targetState, cause)) { + // task may hang on the invokable constructor or static code + // we need watchdog to ensure the task does not remain hanging + startTaskCancellationWatchDog(); + return; + } } else if (current == ExecutionState.INITIALIZING || current == ExecutionState.RUNNING) { if (transitionState(current, targetState, cause)) { @@ -1271,29 +1278,7 @@ public class Task FatalExitExceptionHandler.INSTANCE); interruptingThread.start(); - // if a cancellation timeout is set, the watchdog thread kills the process - // if graceful cancellation does not succeed - if (taskCancellationTimeout > 0) { - Runnable cancelWatchdog = - new TaskCancelerWatchDog( - taskInfo, - executingThread, - taskManagerActions, - taskCancellationTimeout, - jobId); - - Thread watchDogThread = - new Thread( - executingThread.getThreadGroup(), - cancelWatchdog, - String.format( - "Cancellation Watchdog for %s (%s).", - taskNameWithSubtask, executionId)); - watchDogThread.setDaemon(true); - watchDogThread.setUncaughtExceptionHandler( - FatalExitExceptionHandler.INSTANCE); - watchDogThread.start(); - } + startTaskCancellationWatchDog(); } return; } @@ -1306,6 +1291,31 @@ public class Task } } + private void startTaskCancellationWatchDog() { + // if a cancellation timeout is set, the watchdog thread kills the process + // if graceful cancellation does not succeed + if (taskCancellationTimeout > 0) { + Runnable cancelWatchdog = + new TaskCancelerWatchDog( + taskInfo, + executingThread, + taskManagerActions, + taskCancellationTimeout, + jobId); + + Thread watchDogThread = + new Thread( + executingThread.getThreadGroup(), + cancelWatchdog, + String.format( + "Cancellation Watchdog for %s (%s).", + taskNameWithSubtask, executionId)); + watchDogThread.setDaemon(true); + watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE); + watchDogThread.start(); + } + } + // ------------------------------------------------------------------------ // Partition State Listeners // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 45dfcd32c0d..fb3058087a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -935,6 +935,36 @@ public class TaskTest extends TestLogger { task.getExecutingThread().join(); } + /** + * Tests that interrupt happens via watch dog if canceller is stuck in cancel. Task cancellation + * blocks the task canceller. Interrupt after cancel via cancellation watch dog. + */ + @Test + public void testWatchDogThrowFatalErrorOnTaskStuckInInstantiation() throws Exception { + final InterruptOnFatalErrorTaskManagerActions taskManagerActions = + new InterruptOnFatalErrorTaskManagerActions(); + + final Configuration config = new Configuration(); + config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(5)); + config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(1000L)); + + final Task task = + createTaskBuilder() + .setInvokable(InvokableBlockingInInstantiation.class) + .setTaskManagerConfig(config) + .setTaskManagerActions(taskManagerActions) + .build(Executors.directExecutor()); + taskManagerActions.setExecutingThread(task.getExecutingThread()); + + task.startTaskThread(); + InvokableBlockingInInstantiation.await(); + task.cancelExecution(); + task.getExecutingThread().join(); + + // Expect fatal error to recover + assertTrue(taskManagerActions.hasFatalError()); + } + /** * The 'invoke' method holds a lock (trigger awaitLatch after acquisition) and cancel cannot * complete because it also tries to acquire the same lock. This is resolved by the watch dog, @@ -1284,6 +1314,26 @@ public class TaskTest extends TestLogger { } } + /** Customized TaskManagerActions that interrupts task thread on fatal error. */ + private static class InterruptOnFatalErrorTaskManagerActions extends NoOpTaskManagerActions { + private boolean fatalError = false; + private Thread executingThread; + + @Override + public void notifyFatalError(String message, Throwable cause) { + fatalError = true; + executingThread.interrupt(); + } + + public boolean hasFatalError() { + return fatalError; + } + + public void setExecutingThread(Thread executingThread) { + this.executingThread = executingThread; + } + } + // ------------------------------------------------------------------------ // helper functions // ------------------------------------------------------------------------ @@ -1571,6 +1621,30 @@ public class TaskTest extends TestLogger { public void cancel() {} } + /** {@link AbstractInvokable} which blocks in instantiation. */ + public static final class InvokableBlockingInInstantiation extends AbstractInvokable { + /** Declared static, otherwise there's no way to access it when blocking in constructor. */ + static final OneShotLatch AWAIT_LATCH = new OneShotLatch(); + + public InvokableBlockingInInstantiation(Environment environment) + throws InterruptedException { + super(environment); + while (true) { + synchronized (this) { + AWAIT_LATCH.trigger(); + wait(); + } + } + } + + @Override + public void invoke() {} + + static void await() throws InterruptedException { + AWAIT_LATCH.await(); + } + } + // ------------------------------------------------------------------------ // test exceptions // ------------------------------------------------------------------------