+1 and Thanks! On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw <[email protected]> wrote:
> +1, sounds great. Thanks Pei. > > On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <[email protected]> > wrote: > > +1 for your proposal Pei > > > > On Mon, Jul 25, 2016 at 5:54 PM, Pei He <[email protected]> > wrote: > > > >> Looks to me that followings are agreed: > >> (1). adding cancel() and waitUntilFinish() to PipelineResult. > >> (In streaming mode, "all data watermarks reach to infinity" is > >> considered as finished.) > >> (2). PipelineRunner.run() should return relatively quick as soon as > >> the pipeline/job is started/running. The blocking logic should be left > >> to users' code to handle with PipelineResult.waitUntilFinish(). (Test > >> runners that finish quickly can block run() until the execution is > >> done. So, it is cleaner to verify test results after run()) > >> > >> I will send out PR for (1), and create jira issues to improve runners > for > >> (2). > >> > >> waitToRunning() is controversial, and we have several half way agreed > >> proposals. > >> I will pull them out from this thread, so we can close this proposal > >> with cancel() and waitUntilFinish(). And, i will create a jira issue > >> to track how to support ''waiting until other states". > >> > >> Does that sound good with anyone? > >> > >> Thanks > >> -- > >> Pei > >> > >> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw > >> <[email protected]> wrote: > >> > On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <[email protected]> > >> wrote: > >> >> This health check seems redundant with just waiting a while and then > >> >> checking on the status, other than returning earlier in the case of > >> >> reaching a terminal state. What about adding: > >> >> > >> >> /** > >> >> * Returns the state after waiting the specified duration. Will > return > >> >> earlier if the pipeline > >> >> * reaches a terminal state. > >> >> */ > >> >> State getStateAfter(Duration duration); > >> >> > >> >> This seems to be a useful building block, both for the user's > pipeline > >> (in > >> >> case they wanted to build something like wait and then check health) > and > >> >> also for the SDK (to implement waitUntilFinished, etc.) > >> > > >> > A generic waitFor(Duration) which may return early if a terminal state > >> > is entered seems useful. I don't know that we need a return value > >> > here, given that we an then query the PipelineResult however we want > >> > once this returns. waitUntilFinished is simply > >> > waitFor(InfiniteDuration). > >> > > >> >> On Thu, Jul 21, 2016 at 4:11 PM Pei He <[email protected]> > >> wrote: > >> >> > >> >>> I am not in favor of supporting wait for every states or > >> >>> waitUntilState(...). > >> >>> One reason is PipelineResult.State is not well defined and is not > >> >>> agreed upon runners. > >> >>> Another reason is users might not want to wait for a particular > state. > >> >>> For example, > >> >>> waitUntilFinish() is to wait for a terminal state. > >> >>> So, even runners have different states, we still can define shared > >> >>> properties, such as finished/terminal. > >> > > >> > +1. Running is an intermediate state that doesn't have an obvious > >> > mapping onto all runners, which is another reason it's odd to wait > >> > until then. All runners have terminal states. > >> > > >> >>> I think when users call waitUntilRunning(), they want to make sure > the > >> >>> pipeline is up running and is healthy. > >> >> > Maybe we want to wait for at > >> >>> least one element went through the pipeline. > >> > > >> > -1, That might be a while... Also, you may not start generating data > >> > until you pipline is up. > >> > > >> >>> What about changing the waitUntilRunning() to the following? > >> >>> > >> >>> /** > >> >>> * Check if the pipeline is health for the duration. > >> >>> * > >> >>> * Return true if the pipeline is healthy at the end of duration. > >> >>> * Return false if the pipeline is not healthy at the end of > duration. > >> >>> * <p>It may return early if the pipeline is in an unrecoverable > failure > >> >>> state. > >> >>> */ > >> >>> boolean PipelineResult.healthCheck(Duration duration) > >> >>> > >> >>> (I think this also addressed Robert's comment about waitToRunning()) > >> >>> > >> >>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles > >> <[email protected]> > >> >>> wrote: > >> >>> > Some more comments: > >> >>> > > >> >>> > - What are the allowed/expected state transitions prior to > RUNNING? > >> >>> Today, > >> >>> > I presume it is any nonterminal state, so it can be UNKNOWN or > >> STOPPED > >> >>> > (which really means "not yet started") prior to RUNNING. Is this > >> what we > >> >>> > want? > >> >>> > > >> >>> > - If a job can be paused, a transition from RUNNING to STOPPED, > then > >> >>> > waitUntilPaused(Duration) makes sense. > >> >>> > > >> >>> > - Assuming there is some polling under the hood, are runners > >> required to > >> >>> > send back a full history of transitions? Or can transitions be > >> missed, > >> >>> with > >> >>> > only the latest state retrieved? > >> >>> > > >> >>> > - If the latter, then does waitUntilRunning() only wait until > >> RUNNING or > >> >>> > does it also return when it sees STOPPED, which could certainly > >> indicate > >> >>> > that the job transitioned to RUNNING then STOPPED in between > polls. > >> In > >> >>> that > >> >>> > case it is, today, the same as waitUntilStateIsKnown(). > >> >>> > > >> >>> > - The obvious limit of this discussion is > waitUntilState(Duration, > >> >>> > Set<State>), which is the same amount of work to implement. Am I > >> correct > >> >>> > that everyone in this thread thinks this generality is just not > the > >> right > >> >>> > thing for a user API? > >> >>> > > >> >>> > - This enum could probably use revision. I'd chose some > combination > >> of > >> >>> > tightening the enum, making it extensible, and make some aspect > of it > >> >>> > free-form. Not sure where the best balance lies. > >> >>> > > >> >>> > > >> >>> > > >> >>> > On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers > >> >>> <[email protected] > >> >>> >> wrote: > >> >>> > > >> >>> >> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning > rather > >> than > >> >>> >> waitToRunning which reads oddly) > >> >>> >> > >> >>> >> The only reason to separate submission from waitUntilRunning > would > >> be if > >> >>> >> you wanted to kick off several pipelines in quick succession, > then > >> wait > >> >>> for > >> >>> >> them all to be running. For instance: > >> >>> >> > >> >>> >> PipelineResult p1Future = p1.run(); > >> >>> >> PipelineResult p2Future = p2.run(); > >> >>> >> ... > >> >>> >> > >> >>> >> p1Future.waitUntilRunning(); > >> >>> >> p2Future.waitUntilRunning(); > >> >>> >> ... > >> >>> >> > >> >>> >> In this setup, you can more quickly start several pipelines, but > >> your > >> >>> main > >> >>> >> program would wait and report any errors before exiting. > >> >>> >> > >> >>> >> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw > >> >>> >> <[email protected]> wrote: > >> >>> >> > >> >>> >> > I'm in favor of the proposal. My only question is whether we > need > >> >>> >> > PipelineResult.waitToRunning(), instead I'd propose that run() > >> block > >> >>> >> > until the pipeline's running/successfully submitted (or > failed). > >> This > >> >>> >> > would simplify the API--we'd only have one kind of wait that > makes > >> >>> >> > sense in all cases. > >> >>> >> > > >> >>> >> > What kinds of interactions would one want to have with the > >> >>> >> > PipelineResults before it's running? > >> >>> >> > > >> >>> >> > On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh > >> >>> <[email protected]> > >> >>> >> > wrote: > >> >>> >> > > TestPipeline is probably the one runner that can be expected > to > >> >>> block, > >> >>> >> as > >> >>> >> > > certainly JUnit tests and likely other tests will run the > >> Pipeline, > >> >>> and > >> >>> >> > > succeed, even if the PipelineRunner throws an exception. > >> Luckily, > >> >>> this > >> >>> >> > can > >> >>> >> > > be added to TestPipeline.run(), which already has additional > >> >>> behavior > >> >>> >> > > associated with it (currently regarding the unwrapping of > >> >>> >> > AssertionErrors) > >> >>> >> > > > >> >>> >> > > On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles > >> >>> >> <[email protected] > >> >>> >> > > > >> >>> >> > > wrote: > >> >>> >> > > > >> >>> >> > >> I like this proposal. It makes pipeline.run() seem like a > >> pretty > >> >>> >> normal > >> >>> >> > >> async request, and easy to program with. It removes the > >> implicit > >> >>> >> > assumption > >> >>> >> > >> in the prior design that main() is pretty much just "build > and > >> run > >> >>> a > >> >>> >> > >> pipeline". > >> >>> >> > >> > >> >>> >> > >> The part of this that I care about most is being able to > write > >> a > >> >>> >> program > >> >>> >> > >> (not the pipeline, but the program that launches one or more > >> >>> >> pipelines) > >> >>> >> > >> that has reasonable cross-runner behavior. > >> >>> >> > >> > >> >>> >> > >> One comment: > >> >>> >> > >> > >> >>> >> > >> On Wed, Jul 20, 2016 at 3:39 PM, Pei He > >> <[email protected]> > >> >>> >> > wrote: > >> >>> >> > >> > > >> >>> >> > >> > 4. PipelineRunner.run() should (but not required) do > >> non-blocking > >> >>> >> runs > >> >>> >> > >> > > >> >>> >> > >> > >> >>> >> > >> I think we can elaborate on this a little bit. Obviously > there > >> >>> might > >> >>> >> be > >> >>> >> > >> "blocking" in terms of, say, an HTTP round-trip to submit > the > >> job, > >> >>> but > >> >>> >> > >> run() should never be non-terminating. > >> >>> >> > >> > >> >>> >> > >> For a test runner that finishes the pipeline quickly, I > would > >> be > >> >>> fine > >> >>> >> > with > >> >>> >> > >> run() just executing the pipeline, but the PipelineResult > >> should > >> >>> still > >> >>> >> > >> emulate the usual - just always returning a terminal > status. It > >> >>> would > >> >>> >> be > >> >>> >> > >> annoying to add waitToFinish() to the end of all our tests, > but > >> >>> >> leaving > >> >>> >> > a > >> >>> >> > >> run() makes the tests only work with special blocking runner > >> >>> wrappers > >> >>> >> > (and > >> >>> >> > >> make them poor examples). A JUnit @Rule for test pipeline > would > >> >>> hide > >> >>> >> all > >> >>> >> > >> that, perhaps. > >> >>> >> > >> > >> >>> >> > >> > >> >>> >> > >> Kenn > >> >>> >> > >> > >> >>> >> > > >> >>> >> > >> >>> > >> >
