[
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123730#comment-17123730
]
ming li commented on FLINK-15467:
---------------------------------
Below is my test code:
{code:java}
//代码占位符
package com.flink.test;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by liming on 2020/6/2.
*/
public class TestJobClean {
private final Logger logger = LoggerFactory.getLogger(TestLoad.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CustomSource())
.addSink(new DiscardingSink<String>());
env.execute("testJobCancel");
}
private static class CustomSource implements ParallelSourceFunction<String>
{
private final Logger logger =
LoggerFactory.getLogger(CustomSource.class);
private boolean running = true;
public void run(SourceContext<String> sourceContext) throws Exception {
try {
while (running) {
Thread.sleep(1000);
sourceContext.collect("test");
}
} finally {
try {
Thread.sleep(3000); //Simulate preparatory work
} catch (InterruptedException e) {
Thread.sleep(3000); //Simulate preparatory work
}
try {
logger.info("try to load clean class");
Class.forName("com.flink.test.TestLoad"); //try to load
some class to do clean
logger.info("load clean class success");
} catch (Exception e) {
logger.error("load clean class failed", e);
}
}
}
public void cancel() {
running = false;
}
}
}
{code}
The logs are as follows:
2020-06-02 20:34:24,368 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to cancel task Source: Custom Source -> Sink: Unnamed (1/1)
(10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task -
Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e)
switched from RUNNING to CANCELING.
2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task -
Triggering cancellation of task code Source: Custom Source -> Sink: Unnamed
(1/1) (10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task -
Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e)
switched from CANCELING to CANCELED.
2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1)
(10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,372 INFO org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task Source: Custom Source ->
Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) [CANCELED]
2020-06-02 20:34:24,373 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
- Un-registering task and sending final execution state CANCELED to JobManager
for task Source: Custom Source -> Sink: Unnamed (1/1)
10ee411efd563d83214fd6f947dcad1e.
2020-06-02 20:34:24,383 INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot
TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile\{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb
(402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb
(536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId:
e42db2155c2aa58e98c1dce598ecb7f5, jobId: 36242552141952de9bc8f3867f9d6f6d).
2020-06-02 20:34:24,383 INFO
org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job
36242552141952de9bc8f3867f9d6f6d from job leader monitoring.
2020-06-02 20:34:24,384 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
- Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d.
2020-06-02 20:34:24,386 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
- Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d.
2020-06-02 20:34:24,387 INFO
org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot reconnect to
job 36242552141952de9bc8f3867f9d6f6d because it is not registered.
2020-06-02 20:34:27,375 INFO com.flink.test.TestJobClean$CustomSource - try to
load clean class
2020-06-02 20:34:27,376 ERROR com.flink.test.TestJobClean$CustomSource - load
clean class failed
java.lang.ClassNotFoundException: com.flink.test.TestLoad
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.flink.test.TestJobClean$CustomSource.run(TestJobClean.java:40)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
> Should wait for the end of the source thread during the Task cancellation
> -------------------------------------------------------------------------
>
> Key: FLINK-15467
> URL: https://issues.apache.org/jira/browse/FLINK-15467
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.9.0, 1.9.1
> Reporter: ming li
> Priority: Major
>
> In the new mailBox model, SourceStreamTask starts a source thread to run
> user methods, and the current execution thread will block on mailbox.takeMail
> (). When a task cancels, the TaskCanceler thread will cancel the task and
> interrupt the execution thread. Therefore, the execution thread of
> SourceStreamTask will throw InterruptedException, then cancel the task again,
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
> // Against the usual contract of this method, this implementation is not
> step-wise but blocking instead for
> // compatibility reasons with the current source interface (source
> functions run as a loop, not in steps).
> sourceThread.start();
> // We run an alternative mailbox loop that does not involve default
> actions and synchronizes around actions.
> try {
> runAlternativeMailboxLoop();
> } catch (Exception mailboxEx) {
> // We cancel the source function if some runtime exception escaped the
> mailbox.
> if (!isCanceled()) {
> cancelTask();
> }
> throw mailboxEx;
> }
> sourceThread.join();
> if (!isFinished) {
> sourceThread.checkThrowSourceExecutionException();
> }
> context.allActionsCompleted();
> }
> {code}
> When all tasks of this TaskExecutor are canceled, the blob file will be
> cleaned up. But the real source thread is not finished at this time, which
> will cause a ClassNotFoundException when loading a new class. In this case,
> the source thread may not be able to properly clean up and release resources
> (such as closing child threads, cleaning up local files, etc.). Therefore, I
> think we should mark this task canceled or finished after the execution of
> the source thread is completed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)