[ 
https://issues.apache.org/jira/browse/FLINK-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123730#comment-17123730
 ] 

ming li commented on FLINK-15467:
---------------------------------

Below is my test code:
{code:java}
//代码占位符

package com.flink.test;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by liming on 2020/6/2.
 */
public class TestJobClean {
    private final Logger logger = LoggerFactory.getLogger(TestLoad.class);

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(new CustomSource())
                .addSink(new DiscardingSink<String>());
        env.execute("testJobCancel");
    }

    private static class CustomSource implements ParallelSourceFunction<String> 
{
        private final Logger logger = 
LoggerFactory.getLogger(CustomSource.class);
        private boolean running = true;

        public void run(SourceContext<String> sourceContext) throws Exception {
            try {
                while (running) {
                    Thread.sleep(1000);
                    sourceContext.collect("test");
                }
            } finally {
                try {
                    Thread.sleep(3000); //Simulate preparatory work
                } catch (InterruptedException e) {
                    Thread.sleep(3000); //Simulate preparatory work
                }
                try {
                    logger.info("try to load clean class");
                    Class.forName("com.flink.test.TestLoad"); //try to load 
some class to do clean
                    logger.info("load clean class success");
                } catch (Exception e) {
                    logger.error("load clean class failed", e);
                }

            }
        }

        public void cancel() {
            running = false;
        }
    }
}
{code}
The logs are as follows:
2020-06-02 20:34:24,368 INFO org.apache.flink.runtime.taskmanager.Task - 
Attempting to cancel task Source: Custom Source -> Sink: Unnamed (1/1) 
(10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) 
switched from RUNNING to CANCELING.
2020-06-02 20:34:24,369 INFO org.apache.flink.runtime.taskmanager.Task - 
Triggering cancellation of task code Source: Custom Source -> Sink: Unnamed 
(1/1) (10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Custom Source -> Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) 
switched from CANCELING to CANCELED.
2020-06-02 20:34:24,371 INFO org.apache.flink.runtime.taskmanager.Task - 
Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1) 
(10ee411efd563d83214fd6f947dcad1e).
2020-06-02 20:34:24,372 INFO org.apache.flink.runtime.taskmanager.Task - 
Ensuring all FileSystem streams are closed for task Source: Custom Source -> 
Sink: Unnamed (1/1) (10ee411efd563d83214fd6f947dcad1e) [CANCELED]
2020-06-02 20:34:24,373 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Un-registering task and sending final execution state CANCELED to JobManager 
for task Source: Custom Source -> Sink: Unnamed (1/1) 
10ee411efd563d83214fd6f947dcad1e.
2020-06-02 20:34:24,383 INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile\{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb 
(402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb 
(536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 
e42db2155c2aa58e98c1dce598ecb7f5, jobId: 36242552141952de9bc8f3867f9d6f6d).
2020-06-02 20:34:24,383 INFO 
org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job 
36242552141952de9bc8f3867f9d6f6d from job leader monitoring.
2020-06-02 20:34:24,384 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d.
2020-06-02 20:34:24,386 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Close JobManager connection for job 36242552141952de9bc8f3867f9d6f6d.
2020-06-02 20:34:24,387 INFO 
org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot reconnect to 
job 36242552141952de9bc8f3867f9d6f6d because it is not registered.
2020-06-02 20:34:27,375 INFO com.flink.test.TestJobClean$CustomSource - try to 
load clean class
2020-06-02 20:34:27,376 ERROR com.flink.test.TestJobClean$CustomSource - load 
clean class failed
java.lang.ClassNotFoundException: com.flink.test.TestLoad
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at com.flink.test.TestJobClean$CustomSource.run(TestJobClean.java:40)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)

> Should wait for the end of the source thread during the Task cancellation
> -------------------------------------------------------------------------
>
>                 Key: FLINK-15467
>                 URL: https://issues.apache.org/jira/browse/FLINK-15467
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0, 1.9.1
>            Reporter: ming li
>            Priority: Major
>
>      In the new mailBox model, SourceStreamTask starts a source thread to run 
> user methods, and the current execution thread will block on mailbox.takeMail 
> (). When a task cancels, the TaskCanceler thread will cancel the task and 
> interrupt the execution thread. Therefore, the execution thread of 
> SourceStreamTask will throw InterruptedException, then cancel the task again, 
> and throw an exception.
> {code:java}
> //代码占位符
> @Override
> protected void performDefaultAction(ActionContext context) throws Exception {
>    // Against the usual contract of this method, this implementation is not 
> step-wise but blocking instead for
>    // compatibility reasons with the current source interface (source 
> functions run as a loop, not in steps).
>    sourceThread.start();
>    // We run an alternative mailbox loop that does not involve default 
> actions and synchronizes around actions.
>    try {
>       runAlternativeMailboxLoop();
>    } catch (Exception mailboxEx) {
>       // We cancel the source function if some runtime exception escaped the 
> mailbox.
>       if (!isCanceled()) {
>          cancelTask();
>       }
>       throw mailboxEx;
>    }
>    sourceThread.join();
>    if (!isFinished) {
>       sourceThread.checkThrowSourceExecutionException();
>    }
>    context.allActionsCompleted();
> }
> {code}
>    When all tasks of this TaskExecutor are canceled, the blob file will be 
> cleaned up. But the real source thread is not finished at this time, which 
> will cause a ClassNotFoundException when loading a new class. In this case, 
> the source thread may not be able to properly clean up and release resources 
> (such as closing child threads, cleaning up local files, etc.). Therefore, I 
> think we should mark this task canceled or finished after the execution of 
> the source thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to