[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705012#comment-16705012
 ] 

Arnaud Linz commented on FLINK-10832:
-------------------------------------

I've corrected my corrupted maven cache, now the run is fine.

However, it still does not end the local cluster with 1.7.0 ...

> StreamExecutionEnvironment.execute() does not return when all sources end
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10832
>                 URL: https://issues.apache.org/jira/browse/FLINK-10832
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.5.5, 1.6.2
>            Reporter: Arnaud Linz
>            Priority: Critical
>         Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>         // get the execution environment
>         *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>         // get input data
>         *final* DataStreamSource<String> text = env.addSource(*new* 
> +SourceFunction<String>()+ {
>             @Override
>             *public* *void* run(*final* SourceContext<String> ctx) *throws* 
> Exception {
>                 *for* (*int* count = 0; count < 5; count++) {
>                     ctx.collect(String._valueOf_(count));
>                 }
>             }
>             @Override
>             *public* *void* cancel() {
>             }
>         });
>         text.print().setParallelism(1);
>         env.execute("Simple Test");
>         // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814
>  (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
>  {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and 
> its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
>  {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
> C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
>  {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
>  {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
>  {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
> cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new 
> address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}}
>  {{[2018-11-07 11:11:23,607] INFO Stop job leader service. 
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}}
>  {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to