Hey Matthias,

Thanks for reporting the Exception thrown, we were not preparing for this
use case yet. We fixed it with Gyula, he is pushing a fix for it right now:
When the job is cancelled (for example due to shutting down the executor
underneath) you should not see that InterruptedException as soon as this
commit is in. [1]

As for getting the streaming JobExecutionResult back from a detached job my
current best practice is what you can see in
the ProcessFailureRecoveryTestBase and its streaming implementation:
starting an executor in a separate thread and then joining it with the main
one. Would you prefer a more Storm example-ish solution? [2]

[1] https://github.com/mbalassi/flink/commit/5db06d6d
[2]
https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104

On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> Hi Robert,
>
> thanks for your answer.
>
> I get an InterruptedException when I call shutdown():
>
> java.lang.InterruptedException
>         at java.lang.Object.wait(Native Method)
>         at java.lang.Thread.join(Thread.java:1225)
>         at java.lang.Thread.join(Thread.java:1278)
>         at
>
> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>         at
>
> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>         at
>
> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>         at
>
> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>         at java.lang.Thread.run(Thread.java:701)
>
>
> About the JobExecutionResult:
>
> I added a new method to the API, that calls
> JobClient.submitJobDetached(...) instead of
> JobClient.submitJobAndWait(...). The "detached" version has no return
> value, while the blocking one returns a JobExecutionResult that is
> further returned by execute(). So I cannot get a JobExecutionResult
> right now.
>
> It would be nice to get the JobExecutionResult when stopping the running
> program via a "stop-execution"-call (is there any way to do this?).
> Right now, I sleep for a certain time after calling
> submitJobDetached(...) an call stop() and shutdown() later on (from
> ForkableMiniCluster). The stop() call does not seem to do anything...
> shutdown() works (except for the Exception I get -- as described above).
>
>
> -Matthias
>
>
> On 03/30/2015 09:08 PM, Robert Metzger wrote:
> > Hi Matthias,
> >
> > the streaming folks can probably answer the questions better. But I'll
> > write something to bring this message back to their attention ;)
> >
> > 1) Which exceptions are you seeing? Flink should be able to cleanly shut
> > down.
> > 2) As far as I saw it, the execute() method (of the Streaming API) got an
> > JobExecutionResult return type in the latest master. That contains
> > accumulator results.
> > 3) I think the cancel() method is there for exactly that purpose. If the
> > job is shutting down before the cancel method, that probably a bug.
> >
> >
> > Robert
> >
> >
> >
> > On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> > mj...@informatik.hu-berlin.de> wrote:
> >
> >> Hi,
> >>
> >> I am trying to run an infinite streaming job (ie, one that does not
> >> terminate because it is generating output date randomly on the fly). I
> >> kill this job with .stop() or .shutdown() method of
> >> ForkableFlinkMiniCluster.
> >>
> >> I did not find any example using a similar setup. In the provided
> >> examples, each job terminate automatically, because only a finite input
> >> is processed and the source returns after all data is emitted.
> >>
> >>
> >> I have multiple question about my setup:
> >>
> >>  1) The job never terminates "clean", ie, I get some exceptions. Is this
> >> behavior desired?
> >>
> >>  2) Is it possible to get a result back? Similar to
> >> JobClient.submitJobAndWait(...)?
> >>
> >>  3) Is it somehow possible, to send a signal to the running job such
> >> that the source can terminate regularly as if finite input would be
> >> processed? Right now, I use an while(running) loop and set 'running' to
> >> false in the .cancel() method.
> >>
> >>
> >>
> >> Thanks for your help!
> >>
> >> -Matthias
> >>
> >>
> >>
> >
>
>

Reply via email to