+1 for your proposal Pei On Mon, Jul 25, 2016 at 5:54 PM, Pei He <[email protected]> wrote:
> Looks to me that followings are agreed: > (1). adding cancel() and waitUntilFinish() to PipelineResult. > (In streaming mode, "all data watermarks reach to infinity" is > considered as finished.) > (2). PipelineRunner.run() should return relatively quick as soon as > the pipeline/job is started/running. The blocking logic should be left > to users' code to handle with PipelineResult.waitUntilFinish(). (Test > runners that finish quickly can block run() until the execution is > done. So, it is cleaner to verify test results after run()) > > I will send out PR for (1), and create jira issues to improve runners for > (2). > > waitToRunning() is controversial, and we have several half way agreed > proposals. > I will pull them out from this thread, so we can close this proposal > with cancel() and waitUntilFinish(). And, i will create a jira issue > to track how to support ''waiting until other states". > > Does that sound good with anyone? > > Thanks > -- > Pei > > On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw > <[email protected]> wrote: > > On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <[email protected]> > wrote: > >> This health check seems redundant with just waiting a while and then > >> checking on the status, other than returning earlier in the case of > >> reaching a terminal state. What about adding: > >> > >> /** > >> * Returns the state after waiting the specified duration. Will return > >> earlier if the pipeline > >> * reaches a terminal state. > >> */ > >> State getStateAfter(Duration duration); > >> > >> This seems to be a useful building block, both for the user's pipeline > (in > >> case they wanted to build something like wait and then check health) and > >> also for the SDK (to implement waitUntilFinished, etc.) > > > > A generic waitFor(Duration) which may return early if a terminal state > > is entered seems useful. I don't know that we need a return value > > here, given that we an then query the PipelineResult however we want > > once this returns. waitUntilFinished is simply > > waitFor(InfiniteDuration). > > > >> On Thu, Jul 21, 2016 at 4:11 PM Pei He <[email protected]> > wrote: > >> > >>> I am not in favor of supporting wait for every states or > >>> waitUntilState(...). > >>> One reason is PipelineResult.State is not well defined and is not > >>> agreed upon runners. > >>> Another reason is users might not want to wait for a particular state. > >>> For example, > >>> waitUntilFinish() is to wait for a terminal state. > >>> So, even runners have different states, we still can define shared > >>> properties, such as finished/terminal. > > > > +1. Running is an intermediate state that doesn't have an obvious > > mapping onto all runners, which is another reason it's odd to wait > > until then. All runners have terminal states. > > > >>> I think when users call waitUntilRunning(), they want to make sure the > >>> pipeline is up running and is healthy. > >> > Maybe we want to wait for at > >>> least one element went through the pipeline. > > > > -1, That might be a while... Also, you may not start generating data > > until you pipline is up. > > > >>> What about changing the waitUntilRunning() to the following? > >>> > >>> /** > >>> * Check if the pipeline is health for the duration. > >>> * > >>> * Return true if the pipeline is healthy at the end of duration. > >>> * Return false if the pipeline is not healthy at the end of duration. > >>> * <p>It may return early if the pipeline is in an unrecoverable failure > >>> state. > >>> */ > >>> boolean PipelineResult.healthCheck(Duration duration) > >>> > >>> (I think this also addressed Robert's comment about waitToRunning()) > >>> > >>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles > <[email protected]> > >>> wrote: > >>> > Some more comments: > >>> > > >>> > - What are the allowed/expected state transitions prior to RUNNING? > >>> Today, > >>> > I presume it is any nonterminal state, so it can be UNKNOWN or > STOPPED > >>> > (which really means "not yet started") prior to RUNNING. Is this > what we > >>> > want? > >>> > > >>> > - If a job can be paused, a transition from RUNNING to STOPPED, then > >>> > waitUntilPaused(Duration) makes sense. > >>> > > >>> > - Assuming there is some polling under the hood, are runners > required to > >>> > send back a full history of transitions? Or can transitions be > missed, > >>> with > >>> > only the latest state retrieved? > >>> > > >>> > - If the latter, then does waitUntilRunning() only wait until > RUNNING or > >>> > does it also return when it sees STOPPED, which could certainly > indicate > >>> > that the job transitioned to RUNNING then STOPPED in between polls. > In > >>> that > >>> > case it is, today, the same as waitUntilStateIsKnown(). > >>> > > >>> > - The obvious limit of this discussion is waitUntilState(Duration, > >>> > Set<State>), which is the same amount of work to implement. Am I > correct > >>> > that everyone in this thread thinks this generality is just not the > right > >>> > thing for a user API? > >>> > > >>> > - This enum could probably use revision. I'd chose some combination > of > >>> > tightening the enum, making it extensible, and make some aspect of it > >>> > free-form. Not sure where the best balance lies. > >>> > > >>> > > >>> > > >>> > On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers > >>> <[email protected] > >>> >> wrote: > >>> > > >>> >> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning rather > than > >>> >> waitToRunning which reads oddly) > >>> >> > >>> >> The only reason to separate submission from waitUntilRunning would > be if > >>> >> you wanted to kick off several pipelines in quick succession, then > wait > >>> for > >>> >> them all to be running. For instance: > >>> >> > >>> >> PipelineResult p1Future = p1.run(); > >>> >> PipelineResult p2Future = p2.run(); > >>> >> ... > >>> >> > >>> >> p1Future.waitUntilRunning(); > >>> >> p2Future.waitUntilRunning(); > >>> >> ... > >>> >> > >>> >> In this setup, you can more quickly start several pipelines, but > your > >>> main > >>> >> program would wait and report any errors before exiting. > >>> >> > >>> >> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw > >>> >> <[email protected]> wrote: > >>> >> > >>> >> > I'm in favor of the proposal. My only question is whether we need > >>> >> > PipelineResult.waitToRunning(), instead I'd propose that run() > block > >>> >> > until the pipeline's running/successfully submitted (or failed). > This > >>> >> > would simplify the API--we'd only have one kind of wait that makes > >>> >> > sense in all cases. > >>> >> > > >>> >> > What kinds of interactions would one want to have with the > >>> >> > PipelineResults before it's running? > >>> >> > > >>> >> > On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh > >>> <[email protected]> > >>> >> > wrote: > >>> >> > > TestPipeline is probably the one runner that can be expected to > >>> block, > >>> >> as > >>> >> > > certainly JUnit tests and likely other tests will run the > Pipeline, > >>> and > >>> >> > > succeed, even if the PipelineRunner throws an exception. > Luckily, > >>> this > >>> >> > can > >>> >> > > be added to TestPipeline.run(), which already has additional > >>> behavior > >>> >> > > associated with it (currently regarding the unwrapping of > >>> >> > AssertionErrors) > >>> >> > > > >>> >> > > On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles > >>> >> <[email protected] > >>> >> > > > >>> >> > > wrote: > >>> >> > > > >>> >> > >> I like this proposal. It makes pipeline.run() seem like a > pretty > >>> >> normal > >>> >> > >> async request, and easy to program with. It removes the > implicit > >>> >> > assumption > >>> >> > >> in the prior design that main() is pretty much just "build and > run > >>> a > >>> >> > >> pipeline". > >>> >> > >> > >>> >> > >> The part of this that I care about most is being able to write > a > >>> >> program > >>> >> > >> (not the pipeline, but the program that launches one or more > >>> >> pipelines) > >>> >> > >> that has reasonable cross-runner behavior. > >>> >> > >> > >>> >> > >> One comment: > >>> >> > >> > >>> >> > >> On Wed, Jul 20, 2016 at 3:39 PM, Pei He > <[email protected]> > >>> >> > wrote: > >>> >> > >> > > >>> >> > >> > 4. PipelineRunner.run() should (but not required) do > non-blocking > >>> >> runs > >>> >> > >> > > >>> >> > >> > >>> >> > >> I think we can elaborate on this a little bit. Obviously there > >>> might > >>> >> be > >>> >> > >> "blocking" in terms of, say, an HTTP round-trip to submit the > job, > >>> but > >>> >> > >> run() should never be non-terminating. > >>> >> > >> > >>> >> > >> For a test runner that finishes the pipeline quickly, I would > be > >>> fine > >>> >> > with > >>> >> > >> run() just executing the pipeline, but the PipelineResult > should > >>> still > >>> >> > >> emulate the usual - just always returning a terminal status. It > >>> would > >>> >> be > >>> >> > >> annoying to add waitToFinish() to the end of all our tests, but > >>> >> leaving > >>> >> > a > >>> >> > >> run() makes the tests only work with special blocking runner > >>> wrappers > >>> >> > (and > >>> >> > >> make them poor examples). A JUnit @Rule for test pipeline would > >>> hide > >>> >> all > >>> >> > >> that, perhaps. > >>> >> > >> > >>> >> > >> > >>> >> > >> Kenn > >>> >> > >> > >>> >> > > >>> >> > >>> >
