[
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138723#comment-17138723
]
Roman Khachatryan commented on FLINK-15467:
-------------------------------------------
I tried to run the program above with some corrections (to stop the job after
some elements and load other classes).
I couldn't reproduce the issue: the class was loaded successfully after
stopping all services.
[~Ming Li], are you running it on a cluster and then stopping via JM?
If yes, I think this is what I think is happening in your case:
# Job is canceled in JM
# JM sends RPC to TM to cancel the task
# TM calls Task.cancelExecution which starts TaskCanceler and TaskInterrupter
# Source thread ignores the interrupt
# Main task thread proceeds to finally block in Task.doRun, where it calls
cancelInvokable and notifyFinalState >>
# >> TaskExecutor.unregisterTaskAndNotifyFinalState > taskSlotTable.removeTask
>
# >> TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources, which called
closeJob > BlobLibraryCacheManager.release
If this is the case, then just joining the source thread inside cancelInvokable
would solve the problem (specifically, in SourceStreamTask.cancelTask).
SourceStreamTask.cancelTask is called from:
* RPC notification about checkpoint completion - fine, done asynchronously
* TaskCanceller - can delay subsequent networkResourcesCloser.run() - which is
probably also fine
* Task.cancelInvokable - what we want
> Should wait for the end of the source thread during the Task cancellation
> -------------------------------------------------------------------------
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.9.0, 1.9.1, 1.10.1
> Reporter: ming li
> Assignee: Roman Khachatryan
> Priority: Critical
> Fix For: 1.12.0
>
>
> In the new mailBox model, SourceStreamTask starts a source thread to run
> user methods, and the current execution thread will block on mailbox.takeMail
> (). When a task cancels, the TaskCanceler thread will cancel the task and
> interrupt the execution thread. Therefore, the execution thread of
> SourceStreamTask will throw InterruptedException, then cancel the task again,
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
> // Against the usual contract of this method, this implementation is not
> step-wise but blocking instead for
> // compatibility reasons with the current source interface (source
> functions run as a loop, not in steps).
> sourceThread.start();
> // We run an alternative mailbox loop that does not involve default
> actions and synchronizes around actions.
> try {
> runAlternativeMailboxLoop();
> } catch (Exception mailboxEx) {
> // We cancel the source function if some runtime exception escaped the
> mailbox.
> if (!isCanceled()) {
> cancelTask();
> }
> throw mailboxEx;
> }
> sourceThread.join();
> if (!isFinished) {
> sourceThread.checkThrowSourceExecutionException();
> }
> context.allActionsCompleted();
> }
> {code}
> When all tasks of this TaskExecutor are canceled, the blob file will be
> cleaned up. But the real source thread is not finished at this time, which
> will cause a ClassNotFoundException when loading a new class. In this case,
> the source thread may not be able to properly clean up and release resources
> (such as closing child threads, cleaning up local files, etc.). Therefore, I
> think we should mark this task canceled or finished after the execution of
> the source thread is completed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)