+1, sounds great. Thanks Pei.

On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <[email protected]> wrote:
> +1 for your proposal Pei
>
> On Mon, Jul 25, 2016 at 5:54 PM, Pei He <[email protected]> wrote:
>
>> Looks to me that followings are agreed:
>> (1). adding cancel() and waitUntilFinish() to PipelineResult.
>> (In streaming mode, "all data watermarks reach to infinity" is
>> considered as finished.)
>> (2). PipelineRunner.run() should return relatively quick as soon as
>> the pipeline/job is started/running. The blocking logic should be left
>> to users' code to handle with PipelineResult.waitUntilFinish(). (Test
>> runners that finish quickly can block run() until the execution is
>> done. So, it is cleaner to verify test results after run())
>>
>> I will send out PR for (1), and create jira issues to improve runners for
>> (2).
>>
>> waitToRunning() is controversial, and we have several half way agreed
>> proposals.
>> I will pull them out from this thread, so we can close this proposal
>> with cancel() and waitUntilFinish(). And, i will create a jira issue
>> to track how to support ''waiting until other states".
>>
>> Does that sound good with anyone?
>>
>> Thanks
>> --
>> Pei
>>
>> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
>> <[email protected]> wrote:
>> > On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <[email protected]>
>> wrote:
>> >> This health check seems redundant with just waiting a while and then
>> >> checking on the status, other than returning earlier in the case of
>> >> reaching a terminal state. What about adding:
>> >>
>> >> /**
>> >>  * Returns the state after waiting the specified duration. Will return
>> >> earlier if the pipeline
>> >>  * reaches a terminal state.
>> >>  */
>> >> State getStateAfter(Duration duration);
>> >>
>> >> This seems to be a useful building block, both for the user's pipeline
>> (in
>> >> case they wanted to build something like wait and then check health) and
>> >> also for the SDK (to implement waitUntilFinished, etc.)
>> >
>> > A generic waitFor(Duration) which may return early if a terminal state
>> > is entered seems useful. I don't know that we need a return value
>> > here, given that we an then query the PipelineResult however we want
>> > once this returns. waitUntilFinished is simply
>> > waitFor(InfiniteDuration).
>> >
>> >> On Thu, Jul 21, 2016 at 4:11 PM Pei He <[email protected]>
>> wrote:
>> >>
>> >>> I am not in favor of supporting wait for every states or
>> >>> waitUntilState(...).
>> >>> One reason is PipelineResult.State is not well defined and is not
>> >>> agreed upon runners.
>> >>> Another reason is users might not want to wait for a particular state.
>> >>> For example,
>> >>> waitUntilFinish() is to wait for a terminal state.
>> >>> So, even runners have different states, we still can define shared
>> >>> properties, such as finished/terminal.
>> >
>> > +1. Running is an intermediate state that doesn't have an obvious
>> > mapping onto all runners, which is another reason it's odd to wait
>> > until then. All runners have terminal states.
>> >
>> >>> I think when users call waitUntilRunning(), they want to make sure the
>> >>> pipeline is up running and is healthy.
>> >> > Maybe we want to wait for at
>> >>> least one element went through the pipeline.
>> >
>> > -1, That might be a while... Also, you may not start generating data
>> > until you pipline is up.
>> >
>> >>> What about changing the waitUntilRunning() to the following?
>> >>>
>> >>> /**
>> >>> * Check if the pipeline is health for the duration.
>> >>> *
>> >>> * Return true if the pipeline is healthy at the end of duration.
>> >>> * Return false if the pipeline is not healthy at the end of duration.
>> >>> * <p>It may return early if the pipeline is in an unrecoverable failure
>> >>> state.
>> >>> */
>> >>> boolean PipelineResult.healthCheck(Duration duration)
>> >>>
>> >>> (I think this also addressed Robert's comment about waitToRunning())
>> >>>
>> >>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles
>> <[email protected]>
>> >>> wrote:
>> >>> > Some more comments:
>> >>> >
>> >>> >  - What are the allowed/expected state transitions prior to RUNNING?
>> >>> Today,
>> >>> > I presume it is any nonterminal state, so it can be UNKNOWN or
>> STOPPED
>> >>> > (which really means "not yet started") prior to RUNNING. Is this
>> what we
>> >>> > want?
>> >>> >
>> >>> >  - If a job can be paused, a transition from RUNNING to STOPPED, then
>> >>> > waitUntilPaused(Duration) makes sense.
>> >>> >
>> >>> >  - Assuming there is some polling under the hood, are runners
>> required to
>> >>> > send back a full history of transitions? Or can transitions be
>> missed,
>> >>> with
>> >>> > only the latest state retrieved?
>> >>> >
>> >>> >  - If the latter, then does waitUntilRunning() only wait until
>> RUNNING or
>> >>> > does it also return when it sees STOPPED, which could certainly
>> indicate
>> >>> > that the job transitioned to RUNNING then STOPPED in between polls.
>> In
>> >>> that
>> >>> > case it is, today, the same as waitUntilStateIsKnown().
>> >>> >
>> >>> >  - The obvious limit of this discussion is waitUntilState(Duration,
>> >>> > Set<State>), which is the same amount of work to implement. Am I
>> correct
>> >>> > that everyone in this thread thinks this generality is just not the
>> right
>> >>> > thing for a user API?
>> >>> >
>> >>> >  - This enum could probably use revision. I'd chose some combination
>> of
>> >>> > tightening the enum, making it extensible, and make some aspect of it
>> >>> > free-form. Not sure where the best balance lies.
>> >>> >
>> >>> >
>> >>> >
>> >>> > On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers
>> >>> <[email protected]
>> >>> >> wrote:
>> >>> >
>> >>> >> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning rather
>> than
>> >>> >> waitToRunning which reads oddly)
>> >>> >>
>> >>> >> The only reason to separate submission from waitUntilRunning would
>> be if
>> >>> >> you wanted to kick off several pipelines in quick succession, then
>> wait
>> >>> for
>> >>> >> them all to be running. For instance:
>> >>> >>
>> >>> >> PipelineResult p1Future = p1.run();
>> >>> >> PipelineResult p2Future = p2.run();
>> >>> >> ...
>> >>> >>
>> >>> >> p1Future.waitUntilRunning();
>> >>> >> p2Future.waitUntilRunning();
>> >>> >> ...
>> >>> >>
>> >>> >> In this setup, you can more quickly start several pipelines, but
>> your
>> >>> main
>> >>> >> program would wait and report any errors before exiting.
>> >>> >>
>> >>> >> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw
>> >>> >> <[email protected]> wrote:
>> >>> >>
>> >>> >> > I'm in favor of the proposal. My only question is whether we need
>> >>> >> > PipelineResult.waitToRunning(), instead I'd propose that run()
>> block
>> >>> >> > until the pipeline's running/successfully submitted (or failed).
>> This
>> >>> >> > would simplify the API--we'd only have one kind of wait that makes
>> >>> >> > sense in all cases.
>> >>> >> >
>> >>> >> > What kinds of interactions would one want to have with the
>> >>> >> > PipelineResults before it's running?
>> >>> >> >
>> >>> >> > On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh
>> >>> <[email protected]>
>> >>> >> > wrote:
>> >>> >> > > TestPipeline is probably the one runner that can be expected to
>> >>> block,
>> >>> >> as
>> >>> >> > > certainly JUnit tests and likely other tests will run the
>> Pipeline,
>> >>> and
>> >>> >> > > succeed, even if the PipelineRunner throws an exception.
>> Luckily,
>> >>> this
>> >>> >> > can
>> >>> >> > > be added to TestPipeline.run(), which already has additional
>> >>> behavior
>> >>> >> > > associated with it (currently regarding the unwrapping of
>> >>> >> > AssertionErrors)
>> >>> >> > >
>> >>> >> > > On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles
>> >>> >> <[email protected]
>> >>> >> > >
>> >>> >> > > wrote:
>> >>> >> > >
>> >>> >> > >> I like this proposal. It makes pipeline.run() seem like a
>> pretty
>> >>> >> normal
>> >>> >> > >> async request, and easy to program with. It removes the
>> implicit
>> >>> >> > assumption
>> >>> >> > >> in the prior design that main() is pretty much just "build and
>> run
>> >>> a
>> >>> >> > >> pipeline".
>> >>> >> > >>
>> >>> >> > >> The part of this that I care about most is being able to write
>> a
>> >>> >> program
>> >>> >> > >> (not the pipeline, but the program that launches one or more
>> >>> >> pipelines)
>> >>> >> > >> that has reasonable cross-runner behavior.
>> >>> >> > >>
>> >>> >> > >> One comment:
>> >>> >> > >>
>> >>> >> > >> On Wed, Jul 20, 2016 at 3:39 PM, Pei He
>> <[email protected]>
>> >>> >> > wrote:
>> >>> >> > >> >
>> >>> >> > >> > 4. PipelineRunner.run() should (but not required) do
>> non-blocking
>> >>> >> runs
>> >>> >> > >> >
>> >>> >> > >>
>> >>> >> > >> I think we can elaborate on this a little bit. Obviously there
>> >>> might
>> >>> >> be
>> >>> >> > >> "blocking" in terms of, say, an HTTP round-trip to submit the
>> job,
>> >>> but
>> >>> >> > >> run() should never be non-terminating.
>> >>> >> > >>
>> >>> >> > >> For a test runner that finishes the pipeline quickly, I would
>> be
>> >>> fine
>> >>> >> > with
>> >>> >> > >> run() just executing the pipeline, but the PipelineResult
>> should
>> >>> still
>> >>> >> > >> emulate the usual - just always returning a terminal status. It
>> >>> would
>> >>> >> be
>> >>> >> > >> annoying to add waitToFinish() to the end of all our tests, but
>> >>> >> leaving
>> >>> >> > a
>> >>> >> > >> run() makes the tests only work with special blocking runner
>> >>> wrappers
>> >>> >> > (and
>> >>> >> > >> make them poor examples). A JUnit @Rule for test pipeline would
>> >>> hide
>> >>> >> all
>> >>> >> > >> that, perhaps.
>> >>> >> > >>
>> >>> >> > >>
>> >>> >> > >> Kenn
>> >>> >> > >>
>> >>> >> >
>> >>> >>
>> >>>
>>

Reply via email to