Why is the timeout needed for Spark ? Thanks
> On Apr 18, 2017, at 3:05 AM, Etienne Chauchot <echauc...@gmail.com> wrote: > > +1 on "runners really terminate in a timely manner to easily programmatically > orchestrate Beam pipelines in a portable way, you do need to know whether > the pipeline will finish without thinking about the specific runner and its > options" > > As an example, in Nexmark, we have streaming mode tests, and for the > benchmark, we need all the queries to behave the same between runners towards > termination. > > For now, to have the consistent behavior, in this mode we need to set a > timeout (a bit random and flaky) on waitUntilFinish() for spark but this > timeout is not needed for direct runner. > > Etienne > >> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit : >> Isn't this already the case? I think semantically it is an unavoidable >> conclusion, so certainly +1 to that. >> >> The DirectRunner and TestDataflowRunner both have this behavior already. >> I've always considered that a streaming job running forever is just [very] >> suboptimal shutdown latency :-) >> >> Some bits of the discussion on the ticket seem to surround whether or how >> to communicate this property in a generic way. Since a runner owns its >> PipelineResult it doesn't seem necessary. >> >> So is the bottom line just that you want to more strongly insist that >> runners really terminate in a timely manner? I'm +1 to that, too, for >> basically the reason Stas gives: In order to easily programmatically >> orchestrate Beam pipelines in a portable way, you do need to know whether >> the pipeline will finish without thinking about the specific runner and its >> options (as with our RunnableOnService tests). >> >> Kenn >> >> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalp...@google.com.invalid> >> wrote: >> >>> Note that even "unbounded pipeline in a streaming runner".waitUntilFinish() >>> can return, e.g., if you cancel it or terminate it. It's totally reasonable >>> for users to want to understand and handle these cases. >>> >>> +1 >>> >>> Dan >>> >>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net> >>> wrote: >>> >>>> +1 >>>> >>>> Good idea !! >>>> >>>> Regards >>>> JB >>>> >>>> >>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: >>>>> >>>>> Raising this onto the mailing list from >>>>> https://issues.apache.org/jira/browse/BEAM-849 >>>>> >>>>> The issue came up: what does it mean for a pipeline to finish, in the >>> Beam >>>>> model? >>>>> >>>>> Note that I am deliberately not talking about "batch" and "streaming" >>>>> pipelines, because this distinction does not exist in the model. Several >>>>> runners have batch/streaming *modes*, which implement the same semantics >>>>> (potentially different subsets: in batch mode typically a runner will >>>>> reject pipelines that have at least one unbounded PCollection) but in an >>>>> operationally different way. However we should define pipeline >>> termination >>>>> at the level of the unified model, and then make sure that all runners >>> in >>>>> all modes implement that properly. >>>>> >>>>> One natural way is to say "a pipeline terminates when the output >>>>> watermarks >>>>> of all of its PCollection's progress to +infinity". (Note: this can be >>>>> generalized, I guess, to having partial executions of a pipeline: if >>>>> you're >>>>> interested in the full contents of only some collections, then you wait >>>>> until only the watermarks of those collections progress to infinity) >>>>> >>>>> A typical "batch" runner mode does not implement watermarks - we can >>> think >>>>> of it as assigning watermark -infinity to an output of a transform that >>>>> hasn't started executing yet, and +infinity to output of a transform >>> that >>>>> has finished executing. This is consistent with how such runners >>> implement >>>>> termination in practice. >>>>> >>>>> Dataflow streaming runner additionally implements such termination for >>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input from >>> the >>>>> sources, and 2) wait until all watermarks progress to infinity. >>>>> >>>>> Let us fill the gap by making this part of the Beam model and declaring >>>>> that all runners should implement this behavior. This will give nice >>>>> properties, e.g.: >>>>> - A pipeline that has only bounded collections can be run by any runner >>> in >>>>> any mode, with the same results and termination behavior (this is >>> actually >>>>> my motivating example for raising this issue is: I was running >>> Splittable >>>>> DoFn tests >>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/ >>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> >>>>> with the streaming Dataflow runner - these tests produce only bounded >>>>> collections - and noticed that they wouldn't terminate even though all >>>>> data >>>>> was processed) >>>>> - It will be possible to implement pipelines that stream data for a >>> while >>>>> and then eventually successfully terminate based on some condition. >>> E.g. a >>>>> pipeline that watches a continuously growing file until it is marked >>>>> read-only, or a pipeline that reads a Kafka topic partition until it >>>>> receives a "poison pill" message. This seems handy. >>>> -- >>>> Jean-Baptiste Onofré >>>> jbono...@apache.org >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >