As a followup - I think it would be a good thing to add a way to gracefully
stop a streaming job.

Something that sends "close" to the sources, and they quit.

We can use this for graceful shutdown wen re-partitioninig / scaling in or
out, ...

On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> Hi,
>
> I will pull the fix and try it out.
>
> Thanks for the hint with the extra Thread. That should work for me. But
> you are actually right; my setup is Storm inspired. I thinks its a very
> natural way to deploy and stop and infinite streaming job. Maybe, you
> want to adopt to it.
>
> The ITCase I am writing bases on StreamingProgramTestBase, so I need the
> JobExecutionResult because the test fails without it.
>
>
> -Matthias
>
>
>
> On 04/01/2015 11:09 AM, Márton Balassi wrote:
> > Hey Matthias,
> >
> > Thanks for reporting the Exception thrown, we were not preparing for this
> > use case yet. We fixed it with Gyula, he is pushing a fix for it right
> now:
> > When the job is cancelled (for example due to shutting down the executor
> > underneath) you should not see that InterruptedException as soon as this
> > commit is in. [1]
> >
> > As for getting the streaming JobExecutionResult back from a detached job
> my
> > current best practice is what you can see in
> > the ProcessFailureRecoveryTestBase and its streaming implementation:
> > starting an executor in a separate thread and then joining it with the
> main
> > one. Would you prefer a more Storm example-ish solution? [2]
> >
> > [1] https://github.com/mbalassi/flink/commit/5db06d6d
> > [2]
> >
> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
> >
> > On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
> > mj...@informatik.hu-berlin.de> wrote:
> >
> >> Hi Robert,
> >>
> >> thanks for your answer.
> >>
> >> I get an InterruptedException when I call shutdown():
> >>
> >> java.lang.InterruptedException
> >>         at java.lang.Object.wait(Native Method)
> >>         at java.lang.Thread.join(Thread.java:1225)
> >>         at java.lang.Thread.join(Thread.java:1278)
> >>         at
> >>
> >>
> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
> >>         at
> >>
> >>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> >>         at java.lang.Thread.run(Thread.java:701)
> >>
> >>
> >> About the JobExecutionResult:
> >>
> >> I added a new method to the API, that calls
> >> JobClient.submitJobDetached(...) instead of
> >> JobClient.submitJobAndWait(...). The "detached" version has no return
> >> value, while the blocking one returns a JobExecutionResult that is
> >> further returned by execute(). So I cannot get a JobExecutionResult
> >> right now.
> >>
> >> It would be nice to get the JobExecutionResult when stopping the running
> >> program via a "stop-execution"-call (is there any way to do this?).
> >> Right now, I sleep for a certain time after calling
> >> submitJobDetached(...) an call stop() and shutdown() later on (from
> >> ForkableMiniCluster). The stop() call does not seem to do anything...
> >> shutdown() works (except for the Exception I get -- as described above).
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 03/30/2015 09:08 PM, Robert Metzger wrote:
> >>> Hi Matthias,
> >>>
> >>> the streaming folks can probably answer the questions better. But I'll
> >>> write something to bring this message back to their attention ;)
> >>>
> >>> 1) Which exceptions are you seeing? Flink should be able to cleanly
> shut
> >>> down.
> >>> 2) As far as I saw it, the execute() method (of the Streaming API) got
> an
> >>> JobExecutionResult return type in the latest master. That contains
> >>> accumulator results.
> >>> 3) I think the cancel() method is there for exactly that purpose. If
> the
> >>> job is shutting down before the cancel method, that probably a bug.
> >>>
> >>>
> >>> Robert
> >>>
> >>>
> >>>
> >>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> >>> mj...@informatik.hu-berlin.de> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I am trying to run an infinite streaming job (ie, one that does not
> >>>> terminate because it is generating output date randomly on the fly). I
> >>>> kill this job with .stop() or .shutdown() method of
> >>>> ForkableFlinkMiniCluster.
> >>>>
> >>>> I did not find any example using a similar setup. In the provided
> >>>> examples, each job terminate automatically, because only a finite
> input
> >>>> is processed and the source returns after all data is emitted.
> >>>>
> >>>>
> >>>> I have multiple question about my setup:
> >>>>
> >>>>  1) The job never terminates "clean", ie, I get some exceptions. Is
> this
> >>>> behavior desired?
> >>>>
> >>>>  2) Is it possible to get a result back? Similar to
> >>>> JobClient.submitJobAndWait(...)?
> >>>>
> >>>>  3) Is it somehow possible, to send a signal to the running job such
> >>>> that the source can terminate regularly as if finite input would be
> >>>> processed? Right now, I use an while(running) loop and set 'running'
> to
> >>>> false in the .cancel() method.
> >>>>
> >>>>
> >>>>
> >>>> Thanks for your help!
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to