The Flink runner currently only supports blocking execution. I'll open a pull request to at least fix waitUntilFinish().
-Max On Thu, Oct 13, 2016 at 11:10 AM, Amit Sela <[email protected]> wrote: > Hi Pei, > > I have someone on my time who started to work on this, I'll follow-up, > thanks for the bum ;-) > > Amit > > On Thu, Oct 13, 2016 at 8:38 AM Jean-Baptiste Onofré <[email protected]> > wrote: > >> Hi Pei, >> >> good one ! >> >> We now have to update the 'other' runners. >> >> Thanks. >> >> Regards >> JB >> >> On 10/12/2016 10:48 PM, Pei He wrote: >> > Hi, >> > I just want to bump this thread, and brought it to attention. >> > >> > PipelineResult now have cancel() and waitUntilFinish(). However, >> currently >> > only DataflowRunner supports it in DataflowPipelineJob. >> > >> > We agreed that users should do "p.run().waitUntilFinish()" if they want >> to >> > block. But, if they do it now, direct, flink, spark runners will throw >> > exceptions. >> > >> > I have following jira issues opened, I am wondering could any people help >> > on them? >> > >> > https://issues.apache.org/jira/browse/BEAM-596 >> > https://issues.apache.org/jira/browse/BEAM-595 >> > https://issues.apache.org/jira/browse/BEAM-593 >> > >> > Thanks >> > -- >> > Pei >> > >> > >> > >> > >> > On Tue, Jul 26, 2016 at 10:54 AM, Amit Sela <[email protected]> >> wrote: >> > >> >> +1 and Thanks! >> >> >> >> On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw >> >> <[email protected]> >> >> wrote: >> >> >> >>> +1, sounds great. Thanks Pei. >> >>> >> >>> On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <[email protected] >> > >> >>> wrote: >> >>>> +1 for your proposal Pei >> >>>> >> >>>> On Mon, Jul 25, 2016 at 5:54 PM, Pei He <[email protected]> >> >>> wrote: >> >>>> >> >>>>> Looks to me that followings are agreed: >> >>>>> (1). adding cancel() and waitUntilFinish() to PipelineResult. >> >>>>> (In streaming mode, "all data watermarks reach to infinity" is >> >>>>> considered as finished.) >> >>>>> (2). PipelineRunner.run() should return relatively quick as soon as >> >>>>> the pipeline/job is started/running. The blocking logic should be >> left >> >>>>> to users' code to handle with PipelineResult.waitUntilFinish(). (Test >> >>>>> runners that finish quickly can block run() until the execution is >> >>>>> done. So, it is cleaner to verify test results after run()) >> >>>>> >> >>>>> I will send out PR for (1), and create jira issues to improve runners >> >>> for >> >>>>> (2). >> >>>>> >> >>>>> waitToRunning() is controversial, and we have several half way agreed >> >>>>> proposals. >> >>>>> I will pull them out from this thread, so we can close this proposal >> >>>>> with cancel() and waitUntilFinish(). And, i will create a jira issue >> >>>>> to track how to support ''waiting until other states". >> >>>>> >> >>>>> Does that sound good with anyone? >> >>>>> >> >>>>> Thanks >> >>>>> -- >> >>>>> Pei >> >>>>> >> >>>>> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw >> >>>>> <[email protected]> wrote: >> >>>>>> On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <[email protected] >> >>> >> >>>>> wrote: >> >>>>>>> This health check seems redundant with just waiting a while and >> >> then >> >>>>>>> checking on the status, other than returning earlier in the case of >> >>>>>>> reaching a terminal state. What about adding: >> >>>>>>> >> >>>>>>> /** >> >>>>>>> * Returns the state after waiting the specified duration. Will >> >>> return >> >>>>>>> earlier if the pipeline >> >>>>>>> * reaches a terminal state. >> >>>>>>> */ >> >>>>>>> State getStateAfter(Duration duration); >> >>>>>>> >> >>>>>>> This seems to be a useful building block, both for the user's >> >>> pipeline >> >>>>> (in >> >>>>>>> case they wanted to build something like wait and then check >> >> health) >> >>> and >> >>>>>>> also for the SDK (to implement waitUntilFinished, etc.) >> >>>>>> >> >>>>>> A generic waitFor(Duration) which may return early if a terminal >> >> state >> >>>>>> is entered seems useful. I don't know that we need a return value >> >>>>>> here, given that we an then query the PipelineResult however we want >> >>>>>> once this returns. waitUntilFinished is simply >> >>>>>> waitFor(InfiniteDuration). >> >>>>>> >> >>>>>>> On Thu, Jul 21, 2016 at 4:11 PM Pei He <[email protected]> >> >>>>> wrote: >> >>>>>>> >> >>>>>>>> I am not in favor of supporting wait for every states or >> >>>>>>>> waitUntilState(...). >> >>>>>>>> One reason is PipelineResult.State is not well defined and is not >> >>>>>>>> agreed upon runners. >> >>>>>>>> Another reason is users might not want to wait for a particular >> >>> state. >> >>>>>>>> For example, >> >>>>>>>> waitUntilFinish() is to wait for a terminal state. >> >>>>>>>> So, even runners have different states, we still can define shared >> >>>>>>>> properties, such as finished/terminal. >> >>>>>> >> >>>>>> +1. Running is an intermediate state that doesn't have an obvious >> >>>>>> mapping onto all runners, which is another reason it's odd to wait >> >>>>>> until then. All runners have terminal states. >> >>>>>> >> >>>>>>>> I think when users call waitUntilRunning(), they want to make sure >> >>> the >> >>>>>>>> pipeline is up running and is healthy. >> >>>>>>>> Maybe we want to wait for at >> >>>>>>>> least one element went through the pipeline. >> >>>>>> >> >>>>>> -1, That might be a while... Also, you may not start generating data >> >>>>>> until you pipline is up. >> >>>>>> >> >>>>>>>> What about changing the waitUntilRunning() to the following? >> >>>>>>>> >> >>>>>>>> /** >> >>>>>>>> * Check if the pipeline is health for the duration. >> >>>>>>>> * >> >>>>>>>> * Return true if the pipeline is healthy at the end of duration. >> >>>>>>>> * Return false if the pipeline is not healthy at the end of >> >>> duration. >> >>>>>>>> * <p>It may return early if the pipeline is in an unrecoverable >> >>> failure >> >>>>>>>> state. >> >>>>>>>> */ >> >>>>>>>> boolean PipelineResult.healthCheck(Duration duration) >> >>>>>>>> >> >>>>>>>> (I think this also addressed Robert's comment about >> >> waitToRunning()) >> >>>>>>>> >> >>>>>>>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles >> >>>>> <[email protected]> >> >>>>>>>> wrote: >> >>>>>>>>> Some more comments: >> >>>>>>>>> >> >>>>>>>>> - What are the allowed/expected state transitions prior to >> >>> RUNNING? >> >>>>>>>> Today, >> >>>>>>>>> I presume it is any nonterminal state, so it can be UNKNOWN or >> >>>>> STOPPED >> >>>>>>>>> (which really means "not yet started") prior to RUNNING. Is this >> >>>>> what we >> >>>>>>>>> want? >> >>>>>>>>> >> >>>>>>>>> - If a job can be paused, a transition from RUNNING to STOPPED, >> >>> then >> >>>>>>>>> waitUntilPaused(Duration) makes sense. >> >>>>>>>>> >> >>>>>>>>> - Assuming there is some polling under the hood, are runners >> >>>>> required to >> >>>>>>>>> send back a full history of transitions? Or can transitions be >> >>>>> missed, >> >>>>>>>> with >> >>>>>>>>> only the latest state retrieved? >> >>>>>>>>> >> >>>>>>>>> - If the latter, then does waitUntilRunning() only wait until >> >>>>> RUNNING or >> >>>>>>>>> does it also return when it sees STOPPED, which could certainly >> >>>>> indicate >> >>>>>>>>> that the job transitioned to RUNNING then STOPPED in between >> >>> polls. >> >>>>> In >> >>>>>>>> that >> >>>>>>>>> case it is, today, the same as waitUntilStateIsKnown(). >> >>>>>>>>> >> >>>>>>>>> - The obvious limit of this discussion is >> >>> waitUntilState(Duration, >> >>>>>>>>> Set<State>), which is the same amount of work to implement. Am I >> >>>>> correct >> >>>>>>>>> that everyone in this thread thinks this generality is just not >> >>> the >> >>>>> right >> >>>>>>>>> thing for a user API? >> >>>>>>>>> >> >>>>>>>>> - This enum could probably use revision. I'd chose some >> >>> combination >> >>>>> of >> >>>>>>>>> tightening the enum, making it extensible, and make some aspect >> >>> of it >> >>>>>>>>> free-form. Not sure where the best balance lies. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers >> >>>>>>>> <[email protected] >> >>>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning >> >>> rather >> >>>>> than >> >>>>>>>>>> waitToRunning which reads oddly) >> >>>>>>>>>> >> >>>>>>>>>> The only reason to separate submission from waitUntilRunning >> >>> would >> >>>>> be if >> >>>>>>>>>> you wanted to kick off several pipelines in quick succession, >> >>> then >> >>>>> wait >> >>>>>>>> for >> >>>>>>>>>> them all to be running. For instance: >> >>>>>>>>>> >> >>>>>>>>>> PipelineResult p1Future = p1.run(); >> >>>>>>>>>> PipelineResult p2Future = p2.run(); >> >>>>>>>>>> ... >> >>>>>>>>>> >> >>>>>>>>>> p1Future.waitUntilRunning(); >> >>>>>>>>>> p2Future.waitUntilRunning(); >> >>>>>>>>>> ... >> >>>>>>>>>> >> >>>>>>>>>> In this setup, you can more quickly start several pipelines, >> >> but >> >>>>> your >> >>>>>>>> main >> >>>>>>>>>> program would wait and report any errors before exiting. >> >>>>>>>>>> >> >>>>>>>>>> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw >> >>>>>>>>>> <[email protected]> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> I'm in favor of the proposal. My only question is whether we >> >>> need >> >>>>>>>>>>> PipelineResult.waitToRunning(), instead I'd propose that >> >> run() >> >>>>> block >> >>>>>>>>>>> until the pipeline's running/successfully submitted (or >> >>> failed). >> >>>>> This >> >>>>>>>>>>> would simplify the API--we'd only have one kind of wait that >> >>> makes >> >>>>>>>>>>> sense in all cases. >> >>>>>>>>>>> >> >>>>>>>>>>> What kinds of interactions would one want to have with the >> >>>>>>>>>>> PipelineResults before it's running? >> >>>>>>>>>>> >> >>>>>>>>>>> On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh >> >>>>>>>> <[email protected]> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>>> TestPipeline is probably the one runner that can be >> >> expected >> >>> to >> >>>>>>>> block, >> >>>>>>>>>> as >> >>>>>>>>>>>> certainly JUnit tests and likely other tests will run the >> >>>>> Pipeline, >> >>>>>>>> and >> >>>>>>>>>>>> succeed, even if the PipelineRunner throws an exception. >> >>>>> Luckily, >> >>>>>>>> this >> >>>>>>>>>>> can >> >>>>>>>>>>>> be added to TestPipeline.run(), which already has >> >> additional >> >>>>>>>> behavior >> >>>>>>>>>>>> associated with it (currently regarding the unwrapping of >> >>>>>>>>>>> AssertionErrors) >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles >> >>>>>>>>>> <[email protected] >> >>>>>>>>>>>> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> I like this proposal. It makes pipeline.run() seem like a >> >>>>> pretty >> >>>>>>>>>> normal >> >>>>>>>>>>>>> async request, and easy to program with. It removes the >> >>>>> implicit >> >>>>>>>>>>> assumption >> >>>>>>>>>>>>> in the prior design that main() is pretty much just "build >> >>> and >> >>>>> run >> >>>>>>>> a >> >>>>>>>>>>>>> pipeline". >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> The part of this that I care about most is being able to >> >>> write >> >>>>> a >> >>>>>>>>>> program >> >>>>>>>>>>>>> (not the pipeline, but the program that launches one or >> >> more >> >>>>>>>>>> pipelines) >> >>>>>>>>>>>>> that has reasonable cross-runner behavior. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> One comment: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Wed, Jul 20, 2016 at 3:39 PM, Pei He >> >>>>> <[email protected]> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 4. PipelineRunner.run() should (but not required) do >> >>>>> non-blocking >> >>>>>>>>>> runs >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I think we can elaborate on this a little bit. Obviously >> >>> there >> >>>>>>>> might >> >>>>>>>>>> be >> >>>>>>>>>>>>> "blocking" in terms of, say, an HTTP round-trip to submit >> >>> the >> >>>>> job, >> >>>>>>>> but >> >>>>>>>>>>>>> run() should never be non-terminating. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> For a test runner that finishes the pipeline quickly, I >> >>> would >> >>>>> be >> >>>>>>>> fine >> >>>>>>>>>>> with >> >>>>>>>>>>>>> run() just executing the pipeline, but the PipelineResult >> >>>>> should >> >>>>>>>> still >> >>>>>>>>>>>>> emulate the usual - just always returning a terminal >> >>> status. It >> >>>>>>>> would >> >>>>>>>>>> be >> >>>>>>>>>>>>> annoying to add waitToFinish() to the end of all our >> >> tests, >> >>> but >> >>>>>>>>>> leaving >> >>>>>>>>>>> a >> >>>>>>>>>>>>> run() makes the tests only work with special blocking >> >> runner >> >>>>>>>> wrappers >> >>>>>>>>>>> (and >> >>>>>>>>>>>>> make them poor examples). A JUnit @Rule for test pipeline >> >>> would >> >>>>>>>> hide >> >>>>>>>>>> all >> >>>>>>>>>>>>> that, perhaps. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Kenn >> >>>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>> >> >>> >> >> >> > >> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com >>
