As a followup - I think it would be a good thing to add a way to gracefully stop a streaming job.
Something that sends "close" to the sources, and they quit. We can use this for graceful shutdown wen re-partitioninig / scaling in or out, ... On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax < mj...@informatik.hu-berlin.de> wrote: > Hi, > > I will pull the fix and try it out. > > Thanks for the hint with the extra Thread. That should work for me. But > you are actually right; my setup is Storm inspired. I thinks its a very > natural way to deploy and stop and infinite streaming job. Maybe, you > want to adopt to it. > > The ITCase I am writing bases on StreamingProgramTestBase, so I need the > JobExecutionResult because the test fails without it. > > > -Matthias > > > > On 04/01/2015 11:09 AM, Márton Balassi wrote: > > Hey Matthias, > > > > Thanks for reporting the Exception thrown, we were not preparing for this > > use case yet. We fixed it with Gyula, he is pushing a fix for it right > now: > > When the job is cancelled (for example due to shutting down the executor > > underneath) you should not see that InterruptedException as soon as this > > commit is in. [1] > > > > As for getting the streaming JobExecutionResult back from a detached job > my > > current best practice is what you can see in > > the ProcessFailureRecoveryTestBase and its streaming implementation: > > starting an executor in a separate thread and then joining it with the > main > > one. Would you prefer a more Storm example-ish solution? [2] > > > > [1] https://github.com/mbalassi/flink/commit/5db06d6d > > [2] > > > https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 > > > > On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax < > > mj...@informatik.hu-berlin.de> wrote: > > > >> Hi Robert, > >> > >> thanks for your answer. > >> > >> I get an InterruptedException when I call shutdown(): > >> > >> java.lang.InterruptedException > >> at java.lang.Object.wait(Native Method) > >> at java.lang.Thread.join(Thread.java:1225) > >> at java.lang.Thread.join(Thread.java:1278) > >> at > >> > >> > org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) > >> at > >> > >> > org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) > >> at > >> > >> > org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) > >> at > >> > >> > org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) > >> at > >> > >> > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > >> at java.lang.Thread.run(Thread.java:701) > >> > >> > >> About the JobExecutionResult: > >> > >> I added a new method to the API, that calls > >> JobClient.submitJobDetached(...) instead of > >> JobClient.submitJobAndWait(...). The "detached" version has no return > >> value, while the blocking one returns a JobExecutionResult that is > >> further returned by execute(). So I cannot get a JobExecutionResult > >> right now. > >> > >> It would be nice to get the JobExecutionResult when stopping the running > >> program via a "stop-execution"-call (is there any way to do this?). > >> Right now, I sleep for a certain time after calling > >> submitJobDetached(...) an call stop() and shutdown() later on (from > >> ForkableMiniCluster). The stop() call does not seem to do anything... > >> shutdown() works (except for the Exception I get -- as described above). > >> > >> > >> -Matthias > >> > >> > >> On 03/30/2015 09:08 PM, Robert Metzger wrote: > >>> Hi Matthias, > >>> > >>> the streaming folks can probably answer the questions better. But I'll > >>> write something to bring this message back to their attention ;) > >>> > >>> 1) Which exceptions are you seeing? Flink should be able to cleanly > shut > >>> down. > >>> 2) As far as I saw it, the execute() method (of the Streaming API) got > an > >>> JobExecutionResult return type in the latest master. That contains > >>> accumulator results. > >>> 3) I think the cancel() method is there for exactly that purpose. If > the > >>> job is shutting down before the cancel method, that probably a bug. > >>> > >>> > >>> Robert > >>> > >>> > >>> > >>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax < > >>> mj...@informatik.hu-berlin.de> wrote: > >>> > >>>> Hi, > >>>> > >>>> I am trying to run an infinite streaming job (ie, one that does not > >>>> terminate because it is generating output date randomly on the fly). I > >>>> kill this job with .stop() or .shutdown() method of > >>>> ForkableFlinkMiniCluster. > >>>> > >>>> I did not find any example using a similar setup. In the provided > >>>> examples, each job terminate automatically, because only a finite > input > >>>> is processed and the source returns after all data is emitted. > >>>> > >>>> > >>>> I have multiple question about my setup: > >>>> > >>>> 1) The job never terminates "clean", ie, I get some exceptions. Is > this > >>>> behavior desired? > >>>> > >>>> 2) Is it possible to get a result back? Similar to > >>>> JobClient.submitJobAndWait(...)? > >>>> > >>>> 3) Is it somehow possible, to send a signal to the running job such > >>>> that the source can terminate regularly as if finite input would be > >>>> processed? Right now, I use an while(running) loop and set 'running' > to > >>>> false in the .cancel() method. > >>>> > >>>> > >>>> > >>>> Thanks for your help! > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>> > >> > >> > > > >