Hi,
I just want to bump this thread, and brought it to attention.

PipelineResult now have cancel() and waitUntilFinish(). However, currently
only DataflowRunner supports it in DataflowPipelineJob.

We agreed that users should do "p.run().waitUntilFinish()" if they want to
block. But, if they do it now, direct, flink, spark runners will throw
exceptions.

I have following jira issues opened, I am wondering could any people help
on them?

https://issues.apache.org/jira/browse/BEAM-596
https://issues.apache.org/jira/browse/BEAM-595
https://issues.apache.org/jira/browse/BEAM-593

Thanks
--
Pei




On Tue, Jul 26, 2016 at 10:54 AM, Amit Sela <amitsel...@gmail.com> wrote:

> +1 and Thanks!
>
> On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw
> <rober...@google.com.invalid>
> wrote:
>
> > +1, sounds great. Thanks Pei.
> >
> > On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> > > +1 for your proposal Pei
> > >
> > > On Mon, Jul 25, 2016 at 5:54 PM, Pei He <pe...@google.com.invalid>
> > wrote:
> > >
> > >> Looks to me that followings are agreed:
> > >> (1). adding cancel() and waitUntilFinish() to PipelineResult.
> > >> (In streaming mode, "all data watermarks reach to infinity" is
> > >> considered as finished.)
> > >> (2). PipelineRunner.run() should return relatively quick as soon as
> > >> the pipeline/job is started/running. The blocking logic should be left
> > >> to users' code to handle with PipelineResult.waitUntilFinish(). (Test
> > >> runners that finish quickly can block run() until the execution is
> > >> done. So, it is cleaner to verify test results after run())
> > >>
> > >> I will send out PR for (1), and create jira issues to improve runners
> > for
> > >> (2).
> > >>
> > >> waitToRunning() is controversial, and we have several half way agreed
> > >> proposals.
> > >> I will pull them out from this thread, so we can close this proposal
> > >> with cancel() and waitUntilFinish(). And, i will create a jira issue
> > >> to track how to support ''waiting until other states".
> > >>
> > >> Does that sound good with anyone?
> > >>
> > >> Thanks
> > >> --
> > >> Pei
> > >>
> > >> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
> > >> <rober...@google.com.invalid> wrote:
> > >> > On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <bchamb...@apache.org
> >
> > >> wrote:
> > >> >> This health check seems redundant with just waiting a while and
> then
> > >> >> checking on the status, other than returning earlier in the case of
> > >> >> reaching a terminal state. What about adding:
> > >> >>
> > >> >> /**
> > >> >>  * Returns the state after waiting the specified duration. Will
> > return
> > >> >> earlier if the pipeline
> > >> >>  * reaches a terminal state.
> > >> >>  */
> > >> >> State getStateAfter(Duration duration);
> > >> >>
> > >> >> This seems to be a useful building block, both for the user's
> > pipeline
> > >> (in
> > >> >> case they wanted to build something like wait and then check
> health)
> > and
> > >> >> also for the SDK (to implement waitUntilFinished, etc.)
> > >> >
> > >> > A generic waitFor(Duration) which may return early if a terminal
> state
> > >> > is entered seems useful. I don't know that we need a return value
> > >> > here, given that we an then query the PipelineResult however we want
> > >> > once this returns. waitUntilFinished is simply
> > >> > waitFor(InfiniteDuration).
> > >> >
> > >> >> On Thu, Jul 21, 2016 at 4:11 PM Pei He <pe...@google.com.invalid>
> > >> wrote:
> > >> >>
> > >> >>> I am not in favor of supporting wait for every states or
> > >> >>> waitUntilState(...).
> > >> >>> One reason is PipelineResult.State is not well defined and is not
> > >> >>> agreed upon runners.
> > >> >>> Another reason is users might not want to wait for a particular
> > state.
> > >> >>> For example,
> > >> >>> waitUntilFinish() is to wait for a terminal state.
> > >> >>> So, even runners have different states, we still can define shared
> > >> >>> properties, such as finished/terminal.
> > >> >
> > >> > +1. Running is an intermediate state that doesn't have an obvious
> > >> > mapping onto all runners, which is another reason it's odd to wait
> > >> > until then. All runners have terminal states.
> > >> >
> > >> >>> I think when users call waitUntilRunning(), they want to make sure
> > the
> > >> >>> pipeline is up running and is healthy.
> > >> >> > Maybe we want to wait for at
> > >> >>> least one element went through the pipeline.
> > >> >
> > >> > -1, That might be a while... Also, you may not start generating data
> > >> > until you pipline is up.
> > >> >
> > >> >>> What about changing the waitUntilRunning() to the following?
> > >> >>>
> > >> >>> /**
> > >> >>> * Check if the pipeline is health for the duration.
> > >> >>> *
> > >> >>> * Return true if the pipeline is healthy at the end of duration.
> > >> >>> * Return false if the pipeline is not healthy at the end of
> > duration.
> > >> >>> * <p>It may return early if the pipeline is in an unrecoverable
> > failure
> > >> >>> state.
> > >> >>> */
> > >> >>> boolean PipelineResult.healthCheck(Duration duration)
> > >> >>>
> > >> >>> (I think this also addressed Robert's comment about
> waitToRunning())
> > >> >>>
> > >> >>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles
> > >> <k...@google.com.invalid>
> > >> >>> wrote:
> > >> >>> > Some more comments:
> > >> >>> >
> > >> >>> >  - What are the allowed/expected state transitions prior to
> > RUNNING?
> > >> >>> Today,
> > >> >>> > I presume it is any nonterminal state, so it can be UNKNOWN or
> > >> STOPPED
> > >> >>> > (which really means "not yet started") prior to RUNNING. Is this
> > >> what we
> > >> >>> > want?
> > >> >>> >
> > >> >>> >  - If a job can be paused, a transition from RUNNING to STOPPED,
> > then
> > >> >>> > waitUntilPaused(Duration) makes sense.
> > >> >>> >
> > >> >>> >  - Assuming there is some polling under the hood, are runners
> > >> required to
> > >> >>> > send back a full history of transitions? Or can transitions be
> > >> missed,
> > >> >>> with
> > >> >>> > only the latest state retrieved?
> > >> >>> >
> > >> >>> >  - If the latter, then does waitUntilRunning() only wait until
> > >> RUNNING or
> > >> >>> > does it also return when it sees STOPPED, which could certainly
> > >> indicate
> > >> >>> > that the job transitioned to RUNNING then STOPPED in between
> > polls.
> > >> In
> > >> >>> that
> > >> >>> > case it is, today, the same as waitUntilStateIsKnown().
> > >> >>> >
> > >> >>> >  - The obvious limit of this discussion is
> > waitUntilState(Duration,
> > >> >>> > Set<State>), which is the same amount of work to implement. Am I
> > >> correct
> > >> >>> > that everyone in this thread thinks this generality is just not
> > the
> > >> right
> > >> >>> > thing for a user API?
> > >> >>> >
> > >> >>> >  - This enum could probably use revision. I'd chose some
> > combination
> > >> of
> > >> >>> > tightening the enum, making it extensible, and make some aspect
> > of it
> > >> >>> > free-form. Not sure where the best balance lies.
> > >> >>> >
> > >> >>> >
> > >> >>> >
> > >> >>> > On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers
> > >> >>> <bchamb...@google.com.invalid
> > >> >>> >> wrote:
> > >> >>> >
> > >> >>> >> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning
> > rather
> > >> than
> > >> >>> >> waitToRunning which reads oddly)
> > >> >>> >>
> > >> >>> >> The only reason to separate submission from waitUntilRunning
> > would
> > >> be if
> > >> >>> >> you wanted to kick off several pipelines in quick succession,
> > then
> > >> wait
> > >> >>> for
> > >> >>> >> them all to be running. For instance:
> > >> >>> >>
> > >> >>> >> PipelineResult p1Future = p1.run();
> > >> >>> >> PipelineResult p2Future = p2.run();
> > >> >>> >> ...
> > >> >>> >>
> > >> >>> >> p1Future.waitUntilRunning();
> > >> >>> >> p2Future.waitUntilRunning();
> > >> >>> >> ...
> > >> >>> >>
> > >> >>> >> In this setup, you can more quickly start several pipelines,
> but
> > >> your
> > >> >>> main
> > >> >>> >> program would wait and report any errors before exiting.
> > >> >>> >>
> > >> >>> >> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw
> > >> >>> >> <rober...@google.com.invalid> wrote:
> > >> >>> >>
> > >> >>> >> > I'm in favor of the proposal. My only question is whether we
> > need
> > >> >>> >> > PipelineResult.waitToRunning(), instead I'd propose that
> run()
> > >> block
> > >> >>> >> > until the pipeline's running/successfully submitted (or
> > failed).
> > >> This
> > >> >>> >> > would simplify the API--we'd only have one kind of wait that
> > makes
> > >> >>> >> > sense in all cases.
> > >> >>> >> >
> > >> >>> >> > What kinds of interactions would one want to have with the
> > >> >>> >> > PipelineResults before it's running?
> > >> >>> >> >
> > >> >>> >> > On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh
> > >> >>> <tg...@google.com.invalid>
> > >> >>> >> > wrote:
> > >> >>> >> > > TestPipeline is probably the one runner that can be
> expected
> > to
> > >> >>> block,
> > >> >>> >> as
> > >> >>> >> > > certainly JUnit tests and likely other tests will run the
> > >> Pipeline,
> > >> >>> and
> > >> >>> >> > > succeed, even if the PipelineRunner throws an exception.
> > >> Luckily,
> > >> >>> this
> > >> >>> >> > can
> > >> >>> >> > > be added to TestPipeline.run(), which already has
> additional
> > >> >>> behavior
> > >> >>> >> > > associated with it (currently regarding the unwrapping of
> > >> >>> >> > AssertionErrors)
> > >> >>> >> > >
> > >> >>> >> > > On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles
> > >> >>> >> <k...@google.com.invalid
> > >> >>> >> > >
> > >> >>> >> > > wrote:
> > >> >>> >> > >
> > >> >>> >> > >> I like this proposal. It makes pipeline.run() seem like a
> > >> pretty
> > >> >>> >> normal
> > >> >>> >> > >> async request, and easy to program with. It removes the
> > >> implicit
> > >> >>> >> > assumption
> > >> >>> >> > >> in the prior design that main() is pretty much just "build
> > and
> > >> run
> > >> >>> a
> > >> >>> >> > >> pipeline".
> > >> >>> >> > >>
> > >> >>> >> > >> The part of this that I care about most is being able to
> > write
> > >> a
> > >> >>> >> program
> > >> >>> >> > >> (not the pipeline, but the program that launches one or
> more
> > >> >>> >> pipelines)
> > >> >>> >> > >> that has reasonable cross-runner behavior.
> > >> >>> >> > >>
> > >> >>> >> > >> One comment:
> > >> >>> >> > >>
> > >> >>> >> > >> On Wed, Jul 20, 2016 at 3:39 PM, Pei He
> > >> <pe...@google.com.invalid>
> > >> >>> >> > wrote:
> > >> >>> >> > >> >
> > >> >>> >> > >> > 4. PipelineRunner.run() should (but not required) do
> > >> non-blocking
> > >> >>> >> runs
> > >> >>> >> > >> >
> > >> >>> >> > >>
> > >> >>> >> > >> I think we can elaborate on this a little bit. Obviously
> > there
> > >> >>> might
> > >> >>> >> be
> > >> >>> >> > >> "blocking" in terms of, say, an HTTP round-trip to submit
> > the
> > >> job,
> > >> >>> but
> > >> >>> >> > >> run() should never be non-terminating.
> > >> >>> >> > >>
> > >> >>> >> > >> For a test runner that finishes the pipeline quickly, I
> > would
> > >> be
> > >> >>> fine
> > >> >>> >> > with
> > >> >>> >> > >> run() just executing the pipeline, but the PipelineResult
> > >> should
> > >> >>> still
> > >> >>> >> > >> emulate the usual - just always returning a terminal
> > status. It
> > >> >>> would
> > >> >>> >> be
> > >> >>> >> > >> annoying to add waitToFinish() to the end of all our
> tests,
> > but
> > >> >>> >> leaving
> > >> >>> >> > a
> > >> >>> >> > >> run() makes the tests only work with special blocking
> runner
> > >> >>> wrappers
> > >> >>> >> > (and
> > >> >>> >> > >> make them poor examples). A JUnit @Rule for test pipeline
> > would
> > >> >>> hide
> > >> >>> >> all
> > >> >>> >> > >> that, perhaps.
> > >> >>> >> > >>
> > >> >>> >> > >>
> > >> >>> >> > >> Kenn
> > >> >>> >> > >>
> > >> >>> >> >
> > >> >>> >>
> > >> >>>
> > >>
> >
>

Reply via email to