[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585309#comment-15585309
 ] 

ASF GitHub Bot commented on FLINK-4715:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2652#discussion_r83839720
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
    @@ -1251,33 +1299,124 @@ public void run() {
                                catch (InterruptedException e) {
                                        // we can ignore this
                                }
    +                   }
    +                   catch (Throwable t) {
    +                           logger.error("Error in the task canceler", t);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Watchdog for the cancellation. If the task is stuck in cancellation,
    +    * we notify the task manager about a fatal error.
    +    */
    +   private static class TaskCancellationWatchDog extends TimerTask {
    +
    +           /**
    +            * Pass logger in order to prevent that the compiler needs to 
inject static bridge methods
    +            * to access it.
    +            */
    +           private final Logger logger;
    +
    +           /** Thread executing the Task. */
    +           private final Thread executor;
    +
    +           /** Interrupt interval. */
    +           private final long interruptInterval;
    +
    +           /** Timeout after which a fatal error notification happens. */
    +           private final long interruptTimeout;
    +
    +           /** TaskManager to notify about a timeout */
    +           private final TaskManagerConnection taskManager;
    +
    +           /** Task name (for logging and error messages). */
    +           private final String taskName;
    +
    +           /** Synchronization with the {@link TaskCanceler} thread. */
    +           private final CountDownLatch taskCancellerLatch;
    +
    +           public TaskCancellationWatchDog(
    +                           Logger logger,
    +                           Thread executor,
    +                           long interruptInterval,
    +                           long interruptTimeout,
    +                           TaskManagerConnection taskManager,
    +                           String taskName,
    +                           CountDownLatch taskCancellerLatch) {
    +
    +                   this.logger = checkNotNull(logger);
    +                   this.executor = checkNotNull(executor);
    +                   this.interruptInterval = 
checkNotNull(interruptInterval);
    +                   this.interruptTimeout = checkNotNull(interruptTimeout);
    +                   this.taskManager = checkNotNull(taskManager);
    +                   this.taskName = checkNotNull(taskName);
    +                   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
    +           }
    +
    +           @Override
    +           public void run() {
    +                   try {
    +                           // Synchronize with task canceler
    +                           taskCancellerLatch.await();
    +                   } catch (Exception e) {
    +                           String msg = String.format("Exception while 
waiting on task " +
    +                                           "canceller to cancel task 
'%s'.", taskName);
    +                           taskManager.notifyFatalError(msg, e);
    +                           return;
    +                   }
    +
    +                   long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
    +                   long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
    +                   long deadline = System.nanoTime() + timeoutNanos;
    +
    +                   try {
    +                           // Initial wait before interrupting periodically
    +                           Thread.sleep(interruptInterval);
    +                   } catch (InterruptedException ignored) {
    +                   }
    +
    +                   // It is possible that the user code does not react to 
the task canceller.
    +                   // for that reason, we spawn this separate thread that 
repeatedly interrupts
    +                   // the user code until it exits. If the suer user code 
does not exit within
    +                   // the timeout, we notify the job manager about a fatal 
error.
    +                   while (executor.isAlive()) {
    +                           long now = System.nanoTime();
    +
    +                           // build the stack trace of where the thread is 
stuck, for the log
    +                           StringBuilder bld = new StringBuilder();
    +                           StackTraceElement[] stack = 
executor.getStackTrace();
    +                           for (StackTraceElement e : stack) {
    +                                   bld.append(e).append('\n');
    +                           }
     
    -                           // it is possible that the user code does not 
react immediately. for that
    -                           // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
    -                           // it exits
    -                           while (executer.isAlive()) {
    -                                   // build the stack trace of where the 
thread is stuck, for the log
    -                                   StringBuilder bld = new StringBuilder();
    -                                   StackTraceElement[] stack = 
executer.getStackTrace();
    -                                   for (StackTraceElement e : stack) {
    -                                           bld.append(e).append('\n');
    -                                   }
    +                           if (now >= deadline) {
    +                                   long duration = 
TimeUnit.SECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
    +                                   String msg = String.format("Task '%s' 
did not react to cancelling signal in " +
    +                                                   "the last %d seconds, 
but is stuck in method:\n %s",
    +                                                   taskName,
    +                                                   duration,
    +                                                   bld.toString());
    +
    +                                   taskManager.notifyFatalError(msg, null);
     
    +                                   return; // done, don't forget to leave 
the loop
    +                           } else {
                                        logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
                                                        taskName, 
bld.toString());
     
    -                                   executer.interrupt();
    +                                   executor.interrupt();
                                        try {
    -                                           
executer.join(taskCancellationIntervalMillis);
    -                                   }
    -                                   catch (InterruptedException e) {
    -                                           // we can ignore this
    +                                           long timeLeftNanos = 
Math.min(intervalNanos, deadline - now - intervalNanos);
    --- End diff --
    
    Is this line correct? Should it not be `Math.min(intervalNanos, deadline - 
now);


> TaskManager should commit suicide after cancellation failure
> ------------------------------------------------------------
>
>                 Key: FLINK-4715
>                 URL: https://issues.apache.org/jira/browse/FLINK-4715
>             Project: Flink
>          Issue Type: Improvement
>          Components: TaskManager
>    Affects Versions: 1.2.0
>            Reporter: Till Rohrmann
>            Assignee: Ufuk Celebi
>             Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to