[ 
https://issues.apache.org/jira/browse/FLINK-11250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuan Mei updated FLINK-11250:
-----------------------------
    Comment: was deleted

(was: merged commit 
[{{3b6b522}}|https://github.com/apache/flink/commit/3b6b5229df09c05fbeeaf88bf91df6d18c71097a]
 into apache:master)

> fix thread leaked when StreamTask switched from DEPLOYING to CANCELING
> ----------------------------------------------------------------------
>
>                 Key: FLINK-11250
>                 URL: https://issues.apache.org/jira/browse/FLINK-11250
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.5.6, 1.6.3, 1.7.1
>            Reporter: lamber-ken
>            Assignee: Anton Kalashnikov
>            Priority: Minor
>              Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available
>             Fix For: 1.7.3
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> begin flink-1.5.x version, streamRecordWriters was created in StreamTask's 
> constructor, which start OutputFlusher daemon thread. so when task switched 
> from DEPLOYING to CANCELING state, the daemon thread will be leaked.
>  
> *reproducible example*
> {code:java}
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(5000);
>     env
>             .addSource(new SourceFunction<String>() {
>                 @Override
>                 public void run(SourceContext<String> ctx) throws Exception {
>                     for (int i = 0; i < 100000000; i++) {
>                         Thread.sleep(100);
>                         ctx.collect("data " + i);
>                     }
>                 }
>                 @Override
>                 public void cancel() {
>                 }
>             })
>             .addSink(new RichSinkFunction<String>() {
>                 @Override
>                 public void open(Configuration parameters) throws Exception {
>                     System.out.println(1 / 0);
>                 }
>                 @Override
>                 public void invoke(String value, Context context) throws 
> Exception {
>                 }
>             }).setParallelism(2);
>     env.execute();
> }{code}
> *some useful log*
> {code:java}
> 2019-01-02 03:03:47.525 [thread==> jobmanager-future-thread-2] 
> executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to SCHEDULED.
> 2019-01-02 03:03:47.526 [thread==> flink-akka.actor.default-dispatcher-5] 
> slotpool.SlotPool#allocateSlot:326 Received slot request 
> [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] for task: Attempt #1 
> (Source: Custom Source (1/1)) @ (unassigned) - [SCHEDULED]
> 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] 
> slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot 
> [SlotRequestId{494e47eb8318e2c09999a1db91dda6b8}] in slot 
> [SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}].
> 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] 
> slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
> single task slot [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] in multi 
> task slot [SlotRequestId{494e47eb8318e2c09999a1db91dda6b8}] for group 
> bc764cd8ddf7a0cff126f51c16239658.
> 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
> slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
> single task slot [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] in multi 
> task slot [SlotRequestId{494e47eb8318e2c09999a1db91dda6b8}] for group 
> 0a448493b4782967b150582570326227.
> 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
> slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot 
> [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] in slot 
> [SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}].
> 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
> slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
> single task slot [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] in multi 
> task slot [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] for group 
> 0a448493b4782967b150582570326227.
> 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] 
> executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from SCHEDULED to DEPLOYING.
> 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] 
> executiongraph.Execution#deploy:576 Deploying Source: Custom Source (1/1) 
> (attempt #1) to localhost
> 2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] 
> state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:162 
> Registered new local state store with configuration 
> LocalRecoveryConfig{localRecoveryMode=false, 
> localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/tmp/localState/aid_AllocationID{7b5faad9073d7fac6759e40981197b8d}],
>  jobID=06e76f6e31728025b22fdda9fadd6f01, 
> jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for 
> 06e76f6e31728025b22fdda9fadd6f01 - bc764cd8ddf7a0cff126f51c16239658 - 0 under 
> allocation id AllocationID{7b5faad9073d7fac6759e40981197b8d}.
> 2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] 
> partition.ResultPartition#<init>:172 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef): Initialized ResultPartition 
> 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef 
> [PIPELINED_BOUNDED, 2 subpartitions, 2 pending references]
> 2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] 
> taskexecutor.TaskExecutor#submitTask:541 Received task Source: Custom Source 
> (1/1).
> 2019-01-02 03:03:47.532 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#transitionState:992 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to DEPLOYING.
> 2019-01-02 03:03:47.532 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:655 Creating FileSystem stream leak safety net for task 
> Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING]
> 2019-01-02 03:03:47.532 [thread==> flink-akka.actor.default-dispatcher-2] 
> state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:166 Found 
> existing local state store for 06e76f6e31728025b22fdda9fadd6f01 - 
> 0a448493b4782967b150582570326227 - 0 under allocation id 
> AllocationID{7b5faad9073d7fac6759e40981197b8d}: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@176e8a65
> 2019-01-02 03:03:47.535 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:662 Loading JAR files for task Source: Custom Source 
> (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
> 2019-01-02 03:03:47.536 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#createUserCodeClassloader:947 Getting user code class loader 
> for task 74a4ed4bb2f80aa2b98e11bd09ea64ef at library cache manager took 0 
> milliseconds
> 2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:688 Registering task at network: Source: Custom Source 
> (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
> 2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] 
> buffer.LocalBufferPool#<init>:125 Using a local buffer pool with 2-12 buffers
> 2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] 
> partition.ResultPartitionManager#registerResultPartition:62 Registered 
> ResultPartition 
> 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef 
> [PIPELINED_BOUNDED, 2 subpartitions, 2 pending references].
> 2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] 
> network.TaskEventDispatcher#registerPartition:59 registering 
> 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef
> 2019-01-02 03:03:47.537 [thread==> flink-akka.actor.default-dispatcher-2] 
> state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:166 Found 
> existing local state store for 06e76f6e31728025b22fdda9fadd6f01 - 
> 0a448493b4782967b150582570326227 - 1 under allocation id 
> AllocationID{d8a1fd06e3e52101b003a04355f64be1}: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@57bf2d8e
> 2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:714 =======> next, kick off the background copying of 
> files for the distributed cache: Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
> 2019-01-02 03:03:47.538 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:729 =======> isCanceledOrFailed: Source: Custom Source 
> (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
> 2019-01-02 03:03:47.540 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:737 =======> call the user code initialization methods: 
> Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
> 2019-01-02 03:03:47.543 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:765 =======> now load and instantiate the task's 
> invokable code: Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
> 2019-01-02 03:03:47.543 [thread==> flink-akka.actor.default-dispatcher-5] 
> executiongraph.ExecutionGraph#transitionState:1356 Job Flink Streaming Job 
> (06e76f6e31728025b22fdda9fadd6f01) switched from state RUNNING to FAILING.
> 2019-01-02 03:03:47.544 [thread==> Source: Custom Source (1/1)] 
> tasks.StreamTask#createStreamRecordWriter:1214 Using partitioner REBALANCE 
> for output 0 of task Source: Custom Source
> 2019-01-02 03:03:47.544 [thread==> Source: Custom Source (1/1)] 
> io.StreamRecordWriter$OutputFlusher#<init>:174 StreamRecordWriter start : 
> outputflusher1546369427544, timeout: 100
> 2019-01-02 03:03:47.544 [thread==> flink-akka.actor.default-dispatcher-5] 
> executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from DEPLOYING to CANCELING.
> 2019-01-02 03:03:47.544 [thread==> flink-akka.actor.default-dispatcher-2] 
> taskmanager.Task#cancelExecution:1055 Attempting to cancel task Source: 
> Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef).
> 2019-01-02 03:03:47.544 [thread==> flink-akka.actor.default-dispatcher-2] 
> taskmanager.Task#cancelOrFailAndCancelInvokable:1078 
> cancelOrFailAndCancelInvokable_current_execution_state==> task: Source: 
> Custom Source (1/1), state: DEPLOYING, targetState CANCELING
> 2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-2] 
> taskmanager.Task#transitionState:992 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from DEPLOYING to CANCELING.
> 2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-3] 
> slotpool.SlotPool#releaseSlot:745 Releasing slot 
> [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] because: Slot is being 
> returned to the SlotPool.
> 2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-3] 
> slotpool.SlotPool#releaseSlot:745 Releasing slot 
> [SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}] because: Release multi task 
> slot because all children have been released.
> 2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-3] 
> slotpool.SlotPool#tryFulfillSlotRequestOrMakeAvailable:842 Adding returned 
> slot [AllocationID{d8a1fd06e3e52101b003a04355f64be1}] to available slots
> 2019-01-02 03:03:47.546 [thread==> flink-akka.actor.default-dispatcher-2] 
> taskexecutor.TaskExecutor#failPartition:656 Discarding the results produced 
> by task execution 8aeff98e78ed3d348af9d4d5679f2b26.
> 2019-01-02 03:03:47.546 [thread==> flink-akka.actor.default-dispatcher-2] 
> partition.ResultPartitionManager#releasePartitionsProducedBy:105 Released all 
> partitions produced by 8aeff98e78ed3d348af9d4d5679f2b26.
> 2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#transitionState:992 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CANCELING to CANCELED.
> 2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:892 Freeing task resources for Source: Custom Source 
> (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef).
> 2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] 
> network.NetworkEnvironment#unregisterTask:263 Unregister task Source: Custom 
> Source (1/1) from network environment (state: CANCELED).
> 2019-01-02 03:03:47.547 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] 
> partition.ResultPartition#release:313 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef): Releasing ResultPartition 
> 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef 
> [PIPELINED_BOUNDED, 2 subpartitions, 2 pending references].
> 2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] 
> partition.PipelinedSubpartition#release:137 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef): Released PipelinedSubpartition#0 [number 
> of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, 
> read view? false].
> 2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] 
> partition.PipelinedSubpartition#release:137 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef): Released PipelinedSubpartition#1 [number 
> of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, 
> read view? false].
> 2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] 
> partition.ResultPartitionManager#releasePartitionsProducedBy:105 Released all 
> partitions produced by 74a4ed4bb2f80aa2b98e11bd09ea64ef.
> 2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] 
> network.TaskEventDispatcher#unregisterPartition:78 unregistering 
> 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef
> 2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] 
> taskmanager.Task#run:919 Ensuring all FileSystem streams are closed for task 
> Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [CANCELED]
> 2019-01-02 03:03:47.549 [thread==> flink-akka.actor.default-dispatcher-2] 
> taskexecutor.TaskExecutor#unregisterTaskAndNotifyFinalState:1337 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task Source: Custom Source 74a4ed4bb2f80aa2b98e11bd09ea64ef.
> 2019-01-02 03:03:47.562 [thread==> flink-akka.actor.default-dispatcher-3] 
> slotpool.SlotPool#releaseSlot:745 Releasing slot 
> [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] because: Slot is being 
> returned to the SlotPool.
> 2019-01-02 03:03:47.566 [thread==> flink-akka.actor.default-dispatcher-3] 
> executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CANCELING to CANCELED.
> 2019-01-02 03:03:47.566 [thread==> flink-akka.actor.default-dispatcher-3] 
> executiongraph.Execution#processFail:1079 Ignoring transition of vertex 
> Source: Custom Source (1/1) - execution #1 to FAILED while being CANCELED.
> 2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] 
> slotpool.SlotPool#releaseSlot:745 Releasing slot 
> [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] because: Slot is being 
> returned to the SlotPool.
> 2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] 
> executiongraph.ExecutionGraph#tryRestartOrFail:1477 Try to restart or fail 
> the job Flink Streaming Job (06e76f6e31728025b22fdda9fadd6f01) if no longer 
> possible.
> 2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] 
> executiongraph.ExecutionGraph#transitionState:1356 Job Flink Streaming Job 
> (06e76f6e31728025b22fdda9fadd6f01) switched from state FAILING to RESTARTING.
> 2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] 
> executiongraph.ExecutionGraph#tryRestartOrFail:1487 Restarting the job Flink 
> Streaming Job (06e76f6e31728025b22fdda9fadd6f01).
> 2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] 
> slotpool.SlotPool#releaseSlot:745 Releasing slot 
> [SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}] because: Release multi task 
> slot because all children have been released.
> 2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] 
> slotpool.SlotPool#tryFulfillSlotRequestOrMakeAvailable:842 Adding returned 
> slot [AllocationID{7b5faad9073d7fac6759e40981197b8d}] to available slots
> 2019-01-02 03:03:47.647 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:47.748 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:47.848 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:47.948 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:48.049 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:48.149 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:48.249 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:48.349 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:48.450 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> 2019-01-02 03:03:48.550 [thread==> OutputFlusher for Source: Custom Source] 
> io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher 
> outputflusher1546369427544
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to