[
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711803#comment-16711803
]
Arnaud Linz edited comment on FLINK-10832 at 12/6/18 6:22 PM:
--------------------------------------------------------------
Okay, it seems to be a JDK related issue. With 1.8.0_31x64 (windows) it fails.
with 1.8.0_172-amd64 (centOS) it works.
was (Author: arnaudl):
Which jdk are you using?
> StreamExecutionEnvironment.execute() does not return when all sources end
> -------------------------------------------------------------------------
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.5.5, 1.6.2
> Reporter: Arnaud Linz
> Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2),
> This code never ends :
> *public* *void* testFlink() *throws* Exception {
> // get the execution environment
> *final* StreamExecutionEnvironment env =
> StreamExecutionEnvironment._getExecutionEnvironment_();
> // get input data
> *final* DataStreamSource<String> text = env.addSource(*new*
> +SourceFunction<String>()+ {
> @Override
> *public* *void* run(*final* SourceContext<String> ctx) *throws*
> Exception {
> *for* (*int* count = 0; count < 5; count++) {
> ctx.collect(String._valueOf_(count));
> }
> }
> @Override
> *public* *void* cancel() {
> }
> });
> text.print().setParallelism(1);
> env.execute("Simple Test");
> // Never ends !
> }
>
> It's critical for us as we heavily rely on this "source exhaustion stop"
> mechanism to achieve proper stop of streaming applications from their own
> code, so it prevents us from using the last flink versions.
>
> The log extract shows that the local cluster tried to shut down, but could
> not do it for no apparent reason:
>
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std.
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
> {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using
> default (Memory / JobManager) MemoryStateBackend (data in heap memory /
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null',
> asynchronous: TRUE, maxStateSize: 5242880)
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
> {{0}}
> {{1}}
> {{2}}
> {{3}}
> {{4}}
> {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
> {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2).
> (org.apache.flink.runtime.taskmanager.Task:818)}}
> {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1)
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED]
> (org.apache.flink.runtime.taskmanager.Task:845)}}
> {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final
> execution state FINISHED to JobManager for task Source: Custom Source ->
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
> {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
> {{[2018-11-07 11:11:13,907] INFO Job Simple Test
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED.
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
> {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job
> 0ef8697ca98f6d2b565ed928d17c8a49.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
> {{[2018-11-07 11:11:13,908] INFO Shutting down
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
> {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
> {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint.
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
> {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor
> akka://flink/user/taskmanager_0.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
> {{[2018-11-07 11:11:23,583] INFO Shutting down
> TaskExecutorLocalStateStoresManager.
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
> {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory
> C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814
> (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
> {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and
> its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
> {{[2018-11-07 11:11:23,591] INFO Removing cache directory
> C:\Users\alinz\AppData\Local\Temp\flink-web-ui
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
> {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager.
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
> {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager.
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
> {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection
> cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new
> address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}}
> {{[2018-11-07 11:11:23,607] INFO Stop job leader service.
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}}
> {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor
> akka://flink/user/taskmanager_0.
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)