[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138723#comment-17138723
 ] 

Roman Khachatryan commented on FLINK-15467:
-------------------------------------------

I tried to run the program above with some corrections (to stop the job after 
some elements and load other classes).

I  couldn't reproduce the issue: the class was loaded successfully after 
stopping all services.

 

[~Ming Li], are you running it on a cluster and then stopping via JM?

 

If yes, I think this is what I think is happening in your case:
 # Job is canceled in JM
 # JM sends RPC to TM to cancel the task
 # TM calls Task.cancelExecution which starts TaskCanceler and TaskInterrupter
 # Source thread ignores the interrupt
 # Main task thread proceeds to finally block in Task.doRun, where it calls 
cancelInvokable and notifyFinalState >>
 # >> TaskExecutor.unregisterTaskAndNotifyFinalState > taskSlotTable.removeTask 
>
 # >> TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources, which called
closeJob > BlobLibraryCacheManager.release

If this is the case, then just joining the source thread inside cancelInvokable 
would solve the problem (specifically, in SourceStreamTask.cancelTask).

 

SourceStreamTask.cancelTask is called from:
 * RPC notification about checkpoint completion - fine, done asynchronously
 * TaskCanceller - can delay subsequent networkResourcesCloser.run() - which is 
probably also fine
 * Task.cancelInvokable - what we want

 

> Should wait for the end of the source thread during the Task cancellation
> -------------------------------------------------------------------------
>
>                 Key: FLINK-15467
>                 URL: https://issues.apache.org/jira/browse/FLINK-15467
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0, 1.9.1, 1.10.1
>            Reporter: ming li
>            Assignee: Roman Khachatryan
>            Priority: Critical
>             Fix For: 1.12.0
>
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>    // Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>    // compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>    sourceThread.start();
>    // We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>    try {
>       runAlternativeMailboxLoop();
>    } catch (Exception mailboxEx) {
>       // We cancel the source function if some runtime exception escaped the 
> mailbox.
>       if (!isCanceled()) {
>          cancelTask();
>       }
>       throw mailboxEx;
>    }
>    sourceThread.join();
>    if (!isFinished) {
>       sourceThread.checkThrowSourceExecutionException();
>    }
>    context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to