Hi,

I will pull the fix and try it out.

Thanks for the hint with the extra Thread. That should work for me. But
you are actually right; my setup is Storm inspired. I thinks its a very
natural way to deploy and stop and infinite streaming job. Maybe, you
want to adopt to it.

The ITCase I am writing bases on StreamingProgramTestBase, so I need the
JobExecutionResult because the test fails without it.


-Matthias



On 04/01/2015 11:09 AM, Márton Balassi wrote:
> Hey Matthias,
> 
> Thanks for reporting the Exception thrown, we were not preparing for this
> use case yet. We fixed it with Gyula, he is pushing a fix for it right now:
> When the job is cancelled (for example due to shutting down the executor
> underneath) you should not see that InterruptedException as soon as this
> commit is in. [1]
> 
> As for getting the streaming JobExecutionResult back from a detached job my
> current best practice is what you can see in
> the ProcessFailureRecoveryTestBase and its streaming implementation:
> starting an executor in a separate thread and then joining it with the main
> one. Would you prefer a more Storm example-ish solution? [2]
> 
> [1] https://github.com/mbalassi/flink/commit/5db06d6d
> [2]
> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
> 
> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
> 
>> Hi Robert,
>>
>> thanks for your answer.
>>
>> I get an InterruptedException when I call shutdown():
>>
>> java.lang.InterruptedException
>>         at java.lang.Object.wait(Native Method)
>>         at java.lang.Thread.join(Thread.java:1225)
>>         at java.lang.Thread.join(Thread.java:1278)
>>         at
>>
>> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>>         at
>>
>> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>>         at
>>
>> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>>         at
>>
>> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>>         at
>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>         at java.lang.Thread.run(Thread.java:701)
>>
>>
>> About the JobExecutionResult:
>>
>> I added a new method to the API, that calls
>> JobClient.submitJobDetached(...) instead of
>> JobClient.submitJobAndWait(...). The "detached" version has no return
>> value, while the blocking one returns a JobExecutionResult that is
>> further returned by execute(). So I cannot get a JobExecutionResult
>> right now.
>>
>> It would be nice to get the JobExecutionResult when stopping the running
>> program via a "stop-execution"-call (is there any way to do this?).
>> Right now, I sleep for a certain time after calling
>> submitJobDetached(...) an call stop() and shutdown() later on (from
>> ForkableMiniCluster). The stop() call does not seem to do anything...
>> shutdown() works (except for the Exception I get -- as described above).
>>
>>
>> -Matthias
>>
>>
>> On 03/30/2015 09:08 PM, Robert Metzger wrote:
>>> Hi Matthias,
>>>
>>> the streaming folks can probably answer the questions better. But I'll
>>> write something to bring this message back to their attention ;)
>>>
>>> 1) Which exceptions are you seeing? Flink should be able to cleanly shut
>>> down.
>>> 2) As far as I saw it, the execute() method (of the Streaming API) got an
>>> JobExecutionResult return type in the latest master. That contains
>>> accumulator results.
>>> 3) I think the cancel() method is there for exactly that purpose. If the
>>> job is shutting down before the cancel method, that probably a bug.
>>>
>>>
>>> Robert
>>>
>>>
>>>
>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
>>> mj...@informatik.hu-berlin.de> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to run an infinite streaming job (ie, one that does not
>>>> terminate because it is generating output date randomly on the fly). I
>>>> kill this job with .stop() or .shutdown() method of
>>>> ForkableFlinkMiniCluster.
>>>>
>>>> I did not find any example using a similar setup. In the provided
>>>> examples, each job terminate automatically, because only a finite input
>>>> is processed and the source returns after all data is emitted.
>>>>
>>>>
>>>> I have multiple question about my setup:
>>>>
>>>>  1) The job never terminates "clean", ie, I get some exceptions. Is this
>>>> behavior desired?
>>>>
>>>>  2) Is it possible to get a result back? Similar to
>>>> JobClient.submitJobAndWait(...)?
>>>>
>>>>  3) Is it somehow possible, to send a signal to the running job such
>>>> that the source can terminate regularly as if finite input would be
>>>> processed? Right now, I use an while(running) loop and set 'running' to
>>>> false in the .cancel() method.
>>>>
>>>>
>>>>
>>>> Thanks for your help!
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to