[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584855#comment-15584855
 ] 

ASF GitHub Bot commented on FLINK-4715:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2652#discussion_r83800845
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
    @@ -1251,33 +1289,113 @@ public void run() {
                                catch (InterruptedException e) {
                                        // we can ignore this
                                }
    +                   }
    +                   catch (Throwable t) {
    +                           LOG.error("Error in the task canceler", t);
    +                   }
     
    -                           // it is possible that the user code does not 
react immediately. for that
    -                           // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
    -                           // it exits
    -                           while (executer.isAlive()) {
    -                                   // build the stack trace of where the 
thread is stuck, for the log
    -                                   StringBuilder bld = new StringBuilder();
    -                                   StackTraceElement[] stack = 
executer.getStackTrace();
    -                                   for (StackTraceElement e : stack) {
    -                                           bld.append(e).append('\n');
    -                                   }
    +                   System.out.println("Canceler done");
    +           }
    +   }
    +
    +   /**
    +    * Watchdog for the cancellation. If the task is stuck in cancellation,
    +    * we notify the task manager about a fatal error.
    +    */
    +   private static class TaskCancellationWatchDog extends TimerTask {
    +
    +           /** Thread executing the Task. */
    +           private final Thread executor;
    +
    +           /** Interrupt interval. */
    +           private final long interruptInterval;
    +
    +           /** Timeout after which a fatal error notification happens. */
    +           private final long interruptTimeout;
    +
    +           /** TaskManager to notify about a timeout */
    +           private final TaskManagerConnection taskManager;
     
    -                                   logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
    +           /** Task name (for logging and error messages). */
    +           private final String taskName;
    +
    +           /** Synchronization with the {@link TaskCanceler} thread. */
    +           private final CountDownLatch taskCancellerLatch;
    +
    +           public TaskCancellationWatchDog(
    +                           Thread executor,
    +                           long interruptInterval,
    +                           long interruptTimeout,
    +                           TaskManagerConnection taskManager,
    +                           String taskName,
    +                           CountDownLatch taskCancellerLatch) {
    +
    +                   this.executor = checkNotNull(executor);
    +                   this.interruptInterval = 
checkNotNull(interruptInterval);
    +                   this.interruptTimeout = checkNotNull(interruptTimeout);
    +                   this.taskManager = checkNotNull(taskManager);
    +                   this.taskName = checkNotNull(taskName);
    +                   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
    +           }
    +
    +           @Override
    +           public void run() {
    +                   try {
    +                           // Synchronize with task canceler
    +                           if (!taskCancellerLatch.await(interruptTimeout, 
TimeUnit.MILLISECONDS)) {
    +                                   return; // Did not return
    +                           }
    +                   } catch (InterruptedException e) {
    +                           return;
    +                   }
    +
    +                   long deadline = System.currentTimeMillis() + 
interruptTimeout;
    --- End diff --
    
    Using `System.nanoTime()` is more stable than `System.currentTimeMillis()`. 
Would be good to use, especially if we are dealing with timeouts that want to 
kill the process.


> TaskManager should commit suicide after cancellation failure
> ------------------------------------------------------------
>
>                 Key: FLINK-4715
>                 URL: https://issues.apache.org/jira/browse/FLINK-4715
>             Project: Flink
>          Issue Type: Improvement
>          Components: TaskManager
>    Affects Versions: 1.2.0
>            Reporter: Till Rohrmann
>            Assignee: Ufuk Celebi
>             Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to