+1, sounds great. Thanks Pei.
On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <[email protected]> wrote: > +1 for your proposal Pei > > On Mon, Jul 25, 2016 at 5:54 PM, Pei He <[email protected]> wrote: > >> Looks to me that followings are agreed: >> (1). adding cancel() and waitUntilFinish() to PipelineResult. >> (In streaming mode, "all data watermarks reach to infinity" is >> considered as finished.) >> (2). PipelineRunner.run() should return relatively quick as soon as >> the pipeline/job is started/running. The blocking logic should be left >> to users' code to handle with PipelineResult.waitUntilFinish(). (Test >> runners that finish quickly can block run() until the execution is >> done. So, it is cleaner to verify test results after run()) >> >> I will send out PR for (1), and create jira issues to improve runners for >> (2). >> >> waitToRunning() is controversial, and we have several half way agreed >> proposals. >> I will pull them out from this thread, so we can close this proposal >> with cancel() and waitUntilFinish(). And, i will create a jira issue >> to track how to support ''waiting until other states". >> >> Does that sound good with anyone? >> >> Thanks >> -- >> Pei >> >> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw >> <[email protected]> wrote: >> > On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <[email protected]> >> wrote: >> >> This health check seems redundant with just waiting a while and then >> >> checking on the status, other than returning earlier in the case of >> >> reaching a terminal state. What about adding: >> >> >> >> /** >> >> * Returns the state after waiting the specified duration. Will return >> >> earlier if the pipeline >> >> * reaches a terminal state. >> >> */ >> >> State getStateAfter(Duration duration); >> >> >> >> This seems to be a useful building block, both for the user's pipeline >> (in >> >> case they wanted to build something like wait and then check health) and >> >> also for the SDK (to implement waitUntilFinished, etc.) >> > >> > A generic waitFor(Duration) which may return early if a terminal state >> > is entered seems useful. I don't know that we need a return value >> > here, given that we an then query the PipelineResult however we want >> > once this returns. waitUntilFinished is simply >> > waitFor(InfiniteDuration). >> > >> >> On Thu, Jul 21, 2016 at 4:11 PM Pei He <[email protected]> >> wrote: >> >> >> >>> I am not in favor of supporting wait for every states or >> >>> waitUntilState(...). >> >>> One reason is PipelineResult.State is not well defined and is not >> >>> agreed upon runners. >> >>> Another reason is users might not want to wait for a particular state. >> >>> For example, >> >>> waitUntilFinish() is to wait for a terminal state. >> >>> So, even runners have different states, we still can define shared >> >>> properties, such as finished/terminal. >> > >> > +1. Running is an intermediate state that doesn't have an obvious >> > mapping onto all runners, which is another reason it's odd to wait >> > until then. All runners have terminal states. >> > >> >>> I think when users call waitUntilRunning(), they want to make sure the >> >>> pipeline is up running and is healthy. >> >> > Maybe we want to wait for at >> >>> least one element went through the pipeline. >> > >> > -1, That might be a while... Also, you may not start generating data >> > until you pipline is up. >> > >> >>> What about changing the waitUntilRunning() to the following? >> >>> >> >>> /** >> >>> * Check if the pipeline is health for the duration. >> >>> * >> >>> * Return true if the pipeline is healthy at the end of duration. >> >>> * Return false if the pipeline is not healthy at the end of duration. >> >>> * <p>It may return early if the pipeline is in an unrecoverable failure >> >>> state. >> >>> */ >> >>> boolean PipelineResult.healthCheck(Duration duration) >> >>> >> >>> (I think this also addressed Robert's comment about waitToRunning()) >> >>> >> >>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles >> <[email protected]> >> >>> wrote: >> >>> > Some more comments: >> >>> > >> >>> > - What are the allowed/expected state transitions prior to RUNNING? >> >>> Today, >> >>> > I presume it is any nonterminal state, so it can be UNKNOWN or >> STOPPED >> >>> > (which really means "not yet started") prior to RUNNING. Is this >> what we >> >>> > want? >> >>> > >> >>> > - If a job can be paused, a transition from RUNNING to STOPPED, then >> >>> > waitUntilPaused(Duration) makes sense. >> >>> > >> >>> > - Assuming there is some polling under the hood, are runners >> required to >> >>> > send back a full history of transitions? Or can transitions be >> missed, >> >>> with >> >>> > only the latest state retrieved? >> >>> > >> >>> > - If the latter, then does waitUntilRunning() only wait until >> RUNNING or >> >>> > does it also return when it sees STOPPED, which could certainly >> indicate >> >>> > that the job transitioned to RUNNING then STOPPED in between polls. >> In >> >>> that >> >>> > case it is, today, the same as waitUntilStateIsKnown(). >> >>> > >> >>> > - The obvious limit of this discussion is waitUntilState(Duration, >> >>> > Set<State>), which is the same amount of work to implement. Am I >> correct >> >>> > that everyone in this thread thinks this generality is just not the >> right >> >>> > thing for a user API? >> >>> > >> >>> > - This enum could probably use revision. I'd chose some combination >> of >> >>> > tightening the enum, making it extensible, and make some aspect of it >> >>> > free-form. Not sure where the best balance lies. >> >>> > >> >>> > >> >>> > >> >>> > On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers >> >>> <[email protected] >> >>> >> wrote: >> >>> > >> >>> >> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning rather >> than >> >>> >> waitToRunning which reads oddly) >> >>> >> >> >>> >> The only reason to separate submission from waitUntilRunning would >> be if >> >>> >> you wanted to kick off several pipelines in quick succession, then >> wait >> >>> for >> >>> >> them all to be running. For instance: >> >>> >> >> >>> >> PipelineResult p1Future = p1.run(); >> >>> >> PipelineResult p2Future = p2.run(); >> >>> >> ... >> >>> >> >> >>> >> p1Future.waitUntilRunning(); >> >>> >> p2Future.waitUntilRunning(); >> >>> >> ... >> >>> >> >> >>> >> In this setup, you can more quickly start several pipelines, but >> your >> >>> main >> >>> >> program would wait and report any errors before exiting. >> >>> >> >> >>> >> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw >> >>> >> <[email protected]> wrote: >> >>> >> >> >>> >> > I'm in favor of the proposal. My only question is whether we need >> >>> >> > PipelineResult.waitToRunning(), instead I'd propose that run() >> block >> >>> >> > until the pipeline's running/successfully submitted (or failed). >> This >> >>> >> > would simplify the API--we'd only have one kind of wait that makes >> >>> >> > sense in all cases. >> >>> >> > >> >>> >> > What kinds of interactions would one want to have with the >> >>> >> > PipelineResults before it's running? >> >>> >> > >> >>> >> > On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh >> >>> <[email protected]> >> >>> >> > wrote: >> >>> >> > > TestPipeline is probably the one runner that can be expected to >> >>> block, >> >>> >> as >> >>> >> > > certainly JUnit tests and likely other tests will run the >> Pipeline, >> >>> and >> >>> >> > > succeed, even if the PipelineRunner throws an exception. >> Luckily, >> >>> this >> >>> >> > can >> >>> >> > > be added to TestPipeline.run(), which already has additional >> >>> behavior >> >>> >> > > associated with it (currently regarding the unwrapping of >> >>> >> > AssertionErrors) >> >>> >> > > >> >>> >> > > On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles >> >>> >> <[email protected] >> >>> >> > > >> >>> >> > > wrote: >> >>> >> > > >> >>> >> > >> I like this proposal. It makes pipeline.run() seem like a >> pretty >> >>> >> normal >> >>> >> > >> async request, and easy to program with. It removes the >> implicit >> >>> >> > assumption >> >>> >> > >> in the prior design that main() is pretty much just "build and >> run >> >>> a >> >>> >> > >> pipeline". >> >>> >> > >> >> >>> >> > >> The part of this that I care about most is being able to write >> a >> >>> >> program >> >>> >> > >> (not the pipeline, but the program that launches one or more >> >>> >> pipelines) >> >>> >> > >> that has reasonable cross-runner behavior. >> >>> >> > >> >> >>> >> > >> One comment: >> >>> >> > >> >> >>> >> > >> On Wed, Jul 20, 2016 at 3:39 PM, Pei He >> <[email protected]> >> >>> >> > wrote: >> >>> >> > >> > >> >>> >> > >> > 4. PipelineRunner.run() should (but not required) do >> non-blocking >> >>> >> runs >> >>> >> > >> > >> >>> >> > >> >> >>> >> > >> I think we can elaborate on this a little bit. Obviously there >> >>> might >> >>> >> be >> >>> >> > >> "blocking" in terms of, say, an HTTP round-trip to submit the >> job, >> >>> but >> >>> >> > >> run() should never be non-terminating. >> >>> >> > >> >> >>> >> > >> For a test runner that finishes the pipeline quickly, I would >> be >> >>> fine >> >>> >> > with >> >>> >> > >> run() just executing the pipeline, but the PipelineResult >> should >> >>> still >> >>> >> > >> emulate the usual - just always returning a terminal status. It >> >>> would >> >>> >> be >> >>> >> > >> annoying to add waitToFinish() to the end of all our tests, but >> >>> >> leaving >> >>> >> > a >> >>> >> > >> run() makes the tests only work with special blocking runner >> >>> wrappers >> >>> >> > (and >> >>> >> > >> make them poor examples). A JUnit @Rule for test pipeline would >> >>> hide >> >>> >> all >> >>> >> > >> that, perhaps. >> >>> >> > >> >> >>> >> > >> >> >>> >> > >> Kenn >> >>> >> > >> >> >>> >> > >> >>> >> >> >>> >>
