[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124822#comment-17124822
 ] 

Piotr Nowojski edited comment on FLINK-15467 at 6/3/20, 10:15 AM:
------------------------------------------------------------------

Thanks for coming back and providing a way to reproduce the problem. I haven't 
run your code, but it looks like you are right. I'm not entirely sure how to 
fix this from the top of my head, someone would have to investigate this 
further.

Also the problem might be a bit more profound and solution should take into an 
account operators spawning custom threads as well.


was (Author: pnowojski):
Thanks for coming back and providing a way to reproduce the problem. I haven't 
run your code, but it looks like you are right. I'm not entirely sure how to 
fix this from the top of my head, someone would have to investigate this 
further.

> Should wait for the end of the source thread during the Task cancellation
> -------------------------------------------------------------------------
>
>                 Key: FLINK-15467
>                 URL: https://issues.apache.org/jira/browse/FLINK-15467
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0, 1.9.1, 1.10.1
>            Reporter: ming li
>            Priority: Critical
>             Fix For: 1.12.0
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>    // Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>    // compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>    sourceThread.start();
>    // We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>    try {
>       runAlternativeMailboxLoop();
>    } catch (Exception mailboxEx) {
>       // We cancel the source function if some runtime exception escaped the 
> mailbox.
>       if (!isCanceled()) {
>          cancelTask();
>       }
>       throw mailboxEx;
>    }
>    sourceThread.join();
>    if (!isFinished) {
>       sourceThread.checkThrowSourceExecutionException();
>    }
>    context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to