[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585309#comment-15585309 ]
ASF GitHub Bot commented on FLINK-4715: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83839720 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1251,33 +1299,124 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } + + /** + * Watchdog for the cancellation. If the task is stuck in cancellation, + * we notify the task manager about a fatal error. + */ + private static class TaskCancellationWatchDog extends TimerTask { + + /** + * Pass logger in order to prevent that the compiler needs to inject static bridge methods + * to access it. + */ + private final Logger logger; + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Logger logger, + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + taskCancellerLatch.await(); + } catch (Exception e) { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + } + + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. + while (executor.isAlive()) { + long now = System.nanoTime(); + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); + StackTraceElement[] stack = executor.getStackTrace(); + for (StackTraceElement e : stack) { + bld.append(e).append('\n'); + } - // it is possible that the user code does not react immediately. for that - // reason, we spawn a separate thread that repeatedly interrupts the user code until - // it exits - while (executer.isAlive()) { - // build the stack trace of where the thread is stuck, for the log - StringBuilder bld = new StringBuilder(); - StackTraceElement[] stack = executer.getStackTrace(); - for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } + if (now >= deadline) { + long duration = TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + String msg = String.format("Task '%s' did not react to cancelling signal in " + + "the last %d seconds, but is stuck in method:\n %s", + taskName, + duration, + bld.toString()); + + taskManager.notifyFatalError(msg, null); + return; // done, don't forget to leave the loop + } else { logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", taskName, bld.toString()); - executer.interrupt(); + executor.interrupt(); try { - executer.join(taskCancellationIntervalMillis); - } - catch (InterruptedException e) { - // we can ignore this + long timeLeftNanos = Math.min(intervalNanos, deadline - now - intervalNanos); --- End diff -- Is this line correct? Should it not be `Math.min(intervalNanos, deadline - now); > TaskManager should commit suicide after cancellation failure > ------------------------------------------------------------ > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager > Affects Versions: 1.2.0 > Reporter: Till Rohrmann > Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)