The Flink runner currently only supports blocking execution. I'll open
a pull request to at least fix waitUntilFinish().

-Max


On Thu, Oct 13, 2016 at 11:10 AM, Amit Sela <amitsel...@gmail.com> wrote:
> Hi Pei,
>
> I have someone on my time who started to work on this, I'll follow-up,
> thanks for the bum ;-)
>
> Amit
>
> On Thu, Oct 13, 2016 at 8:38 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Pei,
>>
>> good one !
>>
>> We now have to update the 'other' runners.
>>
>> Thanks.
>>
>> Regards
>> JB
>>
>> On 10/12/2016 10:48 PM, Pei He wrote:
>> > Hi,
>> > I just want to bump this thread, and brought it to attention.
>> >
>> > PipelineResult now have cancel() and waitUntilFinish(). However,
>> currently
>> > only DataflowRunner supports it in DataflowPipelineJob.
>> >
>> > We agreed that users should do "p.run().waitUntilFinish()" if they want
>> to
>> > block. But, if they do it now, direct, flink, spark runners will throw
>> > exceptions.
>> >
>> > I have following jira issues opened, I am wondering could any people help
>> > on them?
>> >
>> > https://issues.apache.org/jira/browse/BEAM-596
>> > https://issues.apache.org/jira/browse/BEAM-595
>> > https://issues.apache.org/jira/browse/BEAM-593
>> >
>> > Thanks
>> > --
>> > Pei
>> >
>> >
>> >
>> >
>> > On Tue, Jul 26, 2016 at 10:54 AM, Amit Sela <amitsel...@gmail.com>
>> wrote:
>> >
>> >> +1 and Thanks!
>> >>
>> >> On Tue, Jul 26, 2016 at 2:01 AM Robert Bradshaw
>> >> <rober...@google.com.invalid>
>> >> wrote:
>> >>
>> >>> +1, sounds great. Thanks Pei.
>> >>>
>> >>> On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik <lc...@google.com.invalid
>> >
>> >>> wrote:
>> >>>> +1 for your proposal Pei
>> >>>>
>> >>>> On Mon, Jul 25, 2016 at 5:54 PM, Pei He <pe...@google.com.invalid>
>> >>> wrote:
>> >>>>
>> >>>>> Looks to me that followings are agreed:
>> >>>>> (1). adding cancel() and waitUntilFinish() to PipelineResult.
>> >>>>> (In streaming mode, "all data watermarks reach to infinity" is
>> >>>>> considered as finished.)
>> >>>>> (2). PipelineRunner.run() should return relatively quick as soon as
>> >>>>> the pipeline/job is started/running. The blocking logic should be
>> left
>> >>>>> to users' code to handle with PipelineResult.waitUntilFinish(). (Test
>> >>>>> runners that finish quickly can block run() until the execution is
>> >>>>> done. So, it is cleaner to verify test results after run())
>> >>>>>
>> >>>>> I will send out PR for (1), and create jira issues to improve runners
>> >>> for
>> >>>>> (2).
>> >>>>>
>> >>>>> waitToRunning() is controversial, and we have several half way agreed
>> >>>>> proposals.
>> >>>>> I will pull them out from this thread, so we can close this proposal
>> >>>>> with cancel() and waitUntilFinish(). And, i will create a jira issue
>> >>>>> to track how to support ''waiting until other states".
>> >>>>>
>> >>>>> Does that sound good with anyone?
>> >>>>>
>> >>>>> Thanks
>> >>>>> --
>> >>>>> Pei
>> >>>>>
>> >>>>> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
>> >>>>> <rober...@google.com.invalid> wrote:
>> >>>>>> On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers <bchamb...@apache.org
>> >>>
>> >>>>> wrote:
>> >>>>>>> This health check seems redundant with just waiting a while and
>> >> then
>> >>>>>>> checking on the status, other than returning earlier in the case of
>> >>>>>>> reaching a terminal state. What about adding:
>> >>>>>>>
>> >>>>>>> /**
>> >>>>>>>  * Returns the state after waiting the specified duration. Will
>> >>> return
>> >>>>>>> earlier if the pipeline
>> >>>>>>>  * reaches a terminal state.
>> >>>>>>>  */
>> >>>>>>> State getStateAfter(Duration duration);
>> >>>>>>>
>> >>>>>>> This seems to be a useful building block, both for the user's
>> >>> pipeline
>> >>>>> (in
>> >>>>>>> case they wanted to build something like wait and then check
>> >> health)
>> >>> and
>> >>>>>>> also for the SDK (to implement waitUntilFinished, etc.)
>> >>>>>>
>> >>>>>> A generic waitFor(Duration) which may return early if a terminal
>> >> state
>> >>>>>> is entered seems useful. I don't know that we need a return value
>> >>>>>> here, given that we an then query the PipelineResult however we want
>> >>>>>> once this returns. waitUntilFinished is simply
>> >>>>>> waitFor(InfiniteDuration).
>> >>>>>>
>> >>>>>>> On Thu, Jul 21, 2016 at 4:11 PM Pei He <pe...@google.com.invalid>
>> >>>>> wrote:
>> >>>>>>>
>> >>>>>>>> I am not in favor of supporting wait for every states or
>> >>>>>>>> waitUntilState(...).
>> >>>>>>>> One reason is PipelineResult.State is not well defined and is not
>> >>>>>>>> agreed upon runners.
>> >>>>>>>> Another reason is users might not want to wait for a particular
>> >>> state.
>> >>>>>>>> For example,
>> >>>>>>>> waitUntilFinish() is to wait for a terminal state.
>> >>>>>>>> So, even runners have different states, we still can define shared
>> >>>>>>>> properties, such as finished/terminal.
>> >>>>>>
>> >>>>>> +1. Running is an intermediate state that doesn't have an obvious
>> >>>>>> mapping onto all runners, which is another reason it's odd to wait
>> >>>>>> until then. All runners have terminal states.
>> >>>>>>
>> >>>>>>>> I think when users call waitUntilRunning(), they want to make sure
>> >>> the
>> >>>>>>>> pipeline is up running and is healthy.
>> >>>>>>>> Maybe we want to wait for at
>> >>>>>>>> least one element went through the pipeline.
>> >>>>>>
>> >>>>>> -1, That might be a while... Also, you may not start generating data
>> >>>>>> until you pipline is up.
>> >>>>>>
>> >>>>>>>> What about changing the waitUntilRunning() to the following?
>> >>>>>>>>
>> >>>>>>>> /**
>> >>>>>>>> * Check if the pipeline is health for the duration.
>> >>>>>>>> *
>> >>>>>>>> * Return true if the pipeline is healthy at the end of duration.
>> >>>>>>>> * Return false if the pipeline is not healthy at the end of
>> >>> duration.
>> >>>>>>>> * <p>It may return early if the pipeline is in an unrecoverable
>> >>> failure
>> >>>>>>>> state.
>> >>>>>>>> */
>> >>>>>>>> boolean PipelineResult.healthCheck(Duration duration)
>> >>>>>>>>
>> >>>>>>>> (I think this also addressed Robert's comment about
>> >> waitToRunning())
>> >>>>>>>>
>> >>>>>>>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles
>> >>>>> <k...@google.com.invalid>
>> >>>>>>>> wrote:
>> >>>>>>>>> Some more comments:
>> >>>>>>>>>
>> >>>>>>>>>  - What are the allowed/expected state transitions prior to
>> >>> RUNNING?
>> >>>>>>>> Today,
>> >>>>>>>>> I presume it is any nonterminal state, so it can be UNKNOWN or
>> >>>>> STOPPED
>> >>>>>>>>> (which really means "not yet started") prior to RUNNING. Is this
>> >>>>> what we
>> >>>>>>>>> want?
>> >>>>>>>>>
>> >>>>>>>>>  - If a job can be paused, a transition from RUNNING to STOPPED,
>> >>> then
>> >>>>>>>>> waitUntilPaused(Duration) makes sense.
>> >>>>>>>>>
>> >>>>>>>>>  - Assuming there is some polling under the hood, are runners
>> >>>>> required to
>> >>>>>>>>> send back a full history of transitions? Or can transitions be
>> >>>>> missed,
>> >>>>>>>> with
>> >>>>>>>>> only the latest state retrieved?
>> >>>>>>>>>
>> >>>>>>>>>  - If the latter, then does waitUntilRunning() only wait until
>> >>>>> RUNNING or
>> >>>>>>>>> does it also return when it sees STOPPED, which could certainly
>> >>>>> indicate
>> >>>>>>>>> that the job transitioned to RUNNING then STOPPED in between
>> >>> polls.
>> >>>>> In
>> >>>>>>>> that
>> >>>>>>>>> case it is, today, the same as waitUntilStateIsKnown().
>> >>>>>>>>>
>> >>>>>>>>>  - The obvious limit of this discussion is
>> >>> waitUntilState(Duration,
>> >>>>>>>>> Set<State>), which is the same amount of work to implement. Am I
>> >>>>> correct
>> >>>>>>>>> that everyone in this thread thinks this generality is just not
>> >>> the
>> >>>>> right
>> >>>>>>>>> thing for a user API?
>> >>>>>>>>>
>> >>>>>>>>>  - This enum could probably use revision. I'd chose some
>> >>> combination
>> >>>>> of
>> >>>>>>>>> tightening the enum, making it extensible, and make some aspect
>> >>> of it
>> >>>>>>>>> free-form. Not sure where the best balance lies.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers
>> >>>>>>>> <bchamb...@google.com.invalid
>> >>>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning
>> >>> rather
>> >>>>> than
>> >>>>>>>>>> waitToRunning which reads oddly)
>> >>>>>>>>>>
>> >>>>>>>>>> The only reason to separate submission from waitUntilRunning
>> >>> would
>> >>>>> be if
>> >>>>>>>>>> you wanted to kick off several pipelines in quick succession,
>> >>> then
>> >>>>> wait
>> >>>>>>>> for
>> >>>>>>>>>> them all to be running. For instance:
>> >>>>>>>>>>
>> >>>>>>>>>> PipelineResult p1Future = p1.run();
>> >>>>>>>>>> PipelineResult p2Future = p2.run();
>> >>>>>>>>>> ...
>> >>>>>>>>>>
>> >>>>>>>>>> p1Future.waitUntilRunning();
>> >>>>>>>>>> p2Future.waitUntilRunning();
>> >>>>>>>>>> ...
>> >>>>>>>>>>
>> >>>>>>>>>> In this setup, you can more quickly start several pipelines,
>> >> but
>> >>>>> your
>> >>>>>>>> main
>> >>>>>>>>>> program would wait and report any errors before exiting.
>> >>>>>>>>>>
>> >>>>>>>>>> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw
>> >>>>>>>>>> <rober...@google.com.invalid> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> I'm in favor of the proposal. My only question is whether we
>> >>> need
>> >>>>>>>>>>> PipelineResult.waitToRunning(), instead I'd propose that
>> >> run()
>> >>>>> block
>> >>>>>>>>>>> until the pipeline's running/successfully submitted (or
>> >>> failed).
>> >>>>> This
>> >>>>>>>>>>> would simplify the API--we'd only have one kind of wait that
>> >>> makes
>> >>>>>>>>>>> sense in all cases.
>> >>>>>>>>>>>
>> >>>>>>>>>>> What kinds of interactions would one want to have with the
>> >>>>>>>>>>> PipelineResults before it's running?
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh
>> >>>>>>>> <tg...@google.com.invalid>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>> TestPipeline is probably the one runner that can be
>> >> expected
>> >>> to
>> >>>>>>>> block,
>> >>>>>>>>>> as
>> >>>>>>>>>>>> certainly JUnit tests and likely other tests will run the
>> >>>>> Pipeline,
>> >>>>>>>> and
>> >>>>>>>>>>>> succeed, even if the PipelineRunner throws an exception.
>> >>>>> Luckily,
>> >>>>>>>> this
>> >>>>>>>>>>> can
>> >>>>>>>>>>>> be added to TestPipeline.run(), which already has
>> >> additional
>> >>>>>>>> behavior
>> >>>>>>>>>>>> associated with it (currently regarding the unwrapping of
>> >>>>>>>>>>> AssertionErrors)
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles
>> >>>>>>>>>> <k...@google.com.invalid
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> I like this proposal. It makes pipeline.run() seem like a
>> >>>>> pretty
>> >>>>>>>>>> normal
>> >>>>>>>>>>>>> async request, and easy to program with. It removes the
>> >>>>> implicit
>> >>>>>>>>>>> assumption
>> >>>>>>>>>>>>> in the prior design that main() is pretty much just "build
>> >>> and
>> >>>>> run
>> >>>>>>>> a
>> >>>>>>>>>>>>> pipeline".
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> The part of this that I care about most is being able to
>> >>> write
>> >>>>> a
>> >>>>>>>>>> program
>> >>>>>>>>>>>>> (not the pipeline, but the program that launches one or
>> >> more
>> >>>>>>>>>> pipelines)
>> >>>>>>>>>>>>> that has reasonable cross-runner behavior.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> One comment:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Wed, Jul 20, 2016 at 3:39 PM, Pei He
>> >>>>> <pe...@google.com.invalid>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 4. PipelineRunner.run() should (but not required) do
>> >>>>> non-blocking
>> >>>>>>>>>> runs
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I think we can elaborate on this a little bit. Obviously
>> >>> there
>> >>>>>>>> might
>> >>>>>>>>>> be
>> >>>>>>>>>>>>> "blocking" in terms of, say, an HTTP round-trip to submit
>> >>> the
>> >>>>> job,
>> >>>>>>>> but
>> >>>>>>>>>>>>> run() should never be non-terminating.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> For a test runner that finishes the pipeline quickly, I
>> >>> would
>> >>>>> be
>> >>>>>>>> fine
>> >>>>>>>>>>> with
>> >>>>>>>>>>>>> run() just executing the pipeline, but the PipelineResult
>> >>>>> should
>> >>>>>>>> still
>> >>>>>>>>>>>>> emulate the usual - just always returning a terminal
>> >>> status. It
>> >>>>>>>> would
>> >>>>>>>>>> be
>> >>>>>>>>>>>>> annoying to add waitToFinish() to the end of all our
>> >> tests,
>> >>> but
>> >>>>>>>>>> leaving
>> >>>>>>>>>>> a
>> >>>>>>>>>>>>> run() makes the tests only work with special blocking
>> >> runner
>> >>>>>>>> wrappers
>> >>>>>>>>>>> (and
>> >>>>>>>>>>>>> make them poor examples). A JUnit @Rule for test pipeline
>> >>> would
>> >>>>>>>> hide
>> >>>>>>>>>> all
>> >>>>>>>>>>>>> that, perhaps.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Kenn
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>
>> >>>
>> >>
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>

Reply via email to