[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704554#comment-16704554
 ] 

Arnaud Linz commented on FLINK-10832:
-------------------------------------

Okay, I've just tested with the new 1.7.0. All I did is to replace the version 
in the pom, but the tests fails with :

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/shaded/guava18/com/google/common/hash/Hashing
 at 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:80)
 at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145)
 at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:93)
 at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:669)
 at 
org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:92)
 at flink.flink_10832.App.testFlink10832(App.java:60)
 at flink.flink_10832.App.main(App.java:31)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.shaded.guava18.com.google.common.hash.Hashing
 at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 8 more

 

It seems that your maven package for 1.7.0 is not complete...

 

And I'm stuck : 

< v 1.6.0 => use case fails in out of memory

v1.6.0 => use case succeeds but yarn containeres are never killed => no go

> v1.6.0 => local cluster does not end.

What do you mean with " I ran the test from the JIRA once each on master/1.6.2 
but it worked fine" ? You did not use any flink maven dependency but you've ran 
the test from the flink project itself ? Could it be a jar packaging issue ?

 

> StreamExecutionEnvironment.execute() does not return when all sources end
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10832
>                 URL: https://issues.apache.org/jira/browse/FLINK-10832
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.5.5, 1.6.2
>            Reporter: Arnaud Linz
>            Priority: Critical
>         Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>         // get the execution environment
>         *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>         // get input data
>         *final* DataStreamSource<String> text = env.addSource(*new* 
> +SourceFunction<String>()+ {
>             @Override
>             *public* *void* run(*final* SourceContext<String> ctx) *throws* 
> Exception {
>                 *for* (*int* count = 0; count < 5; count++) {
>                     ctx.collect(String._valueOf_(count));
>                 }
>             }
>             @Override
>             *public* *void* cancel() {
>             }
>         });
>         text.print().setParallelism(1);
>         env.execute("Simple Test");
>         // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814
>  (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
>  {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and 
> its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
>  {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
> C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
>  {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
>  {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
>  {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
> cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new 
> address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}}
>  {{[2018-11-07 11:11:23,607] INFO Stop job leader service. 
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}}
>  {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to