It is well-defined, but in the global window anything that has only 1 pane
is not that useful in streaming pipelines :)

Another well-defined thing: "watermark of the signal + its allowed lateness
pass the end of the main input's window".
Hmm I think this can be implemented by simply rewindowing the signal into
the main input's WindowFn? Perhaps we should even always do this?

On Fri, Dec 22, 2017 at 8:11 AM Reuven Lax <re...@google.com> wrote:

> I think it is also well-defined for the on-time (watermark) pane, as there
> is always only one such. In general I think sequencing is well defined for
> three panes:
>
> 1. The first pane fired (this is how side inputs are sequenced)
> 2. The first on-time pane, as there is only one.
> 3. The final pane.
>
>
>
> On Thu, Dec 21, 2017 at 5:49 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Yeah. And I don't think there's a good way to define what sequencing even
>> means, if the sink is returning results in windows that aren't gonna have a
>> final pane.
>>
>> On Thu, Dec 21, 2017, 2:00 AM Reuven Lax <re...@google.com> wrote:
>>
>>> This is only for "final pane" waiting, correct? So someone who writes a
>>> sink in the global window probably would not want to use this.
>>>
>>> On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> PR is out https://github.com/apache/beam/pull/4301
>>>>
>>>> This should allow us to have useful sequencing for sinks like
>>>> BigtableIO / BigQueryIO.
>>>>
>>>> Adding a couple of interested parties:
>>>> - Steve, would you be interested in using this in
>>>> https://github.com/apache/beam/pull/3997 ?
>>>> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 -
>>>> in particular, this works properly in case the input can be firing multiple
>>>> times.
>>>>
>>>> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> I figured out the Never.ever() approach and it seems to work. Will
>>>>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems 
>>>>> like
>>>>> this will be quite a useful transform.
>>>>>
>>>>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I'm a bit confused by all of these suggestions: they sound plausible
>>>>>> at a high level, but I'm having a hard time making any one of them 
>>>>>> concrete.
>>>>>>
>>>>>> So suppose we want to create a transform Wait.on(PCollection<?>
>>>>>> signal): PCollection<T> -> PCollection<T>.
>>>>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical
>>>>>> to "a", but buffers panes of "a" in any given window until the final pane
>>>>>> of "sig" in the same window is fired (or, if it's never fired, until the
>>>>>> window closes? could use a deadletter for that maybe).
>>>>>>
>>>>>> This transform I suppose would need to have a keyed and unkeyed
>>>>>> version.
>>>>>>
>>>>>> The keyed version would support merging window fns, and would require
>>>>>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk -
>>>>>> followed by a stateful ParDo? Or is there a way to get away without a
>>>>>> stateful ParDo here? (not all runners support it)
>>>>>>
>>>>>> The unkeyed version would not support merging window fns. Reuven, can
>>>>>> you elaborate how your combiner idea would work here - in particular, 
>>>>>> what
>>>>>> do you mean by "triggering only on the final pane"? Do you mean filter
>>>>>> non-final panes before entering the combiner? I wonder if that'll work,
>>>>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on
>>>>>> the side input with a Never.ever() trigger"?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> This is an interesting point.
>>>>>>>
>>>>>>> In the past, we've often just though about sequencing some action to
>>>>>>> take place after the sink, in which case you can simply use the sink 
>>>>>>> output
>>>>>>> as a main input. However if you want to run a transform with another
>>>>>>> PCollection as a main input, this doesn't work. And as you've 
>>>>>>> discovered,
>>>>>>> triggered side inputs are defined to be non-deterministic, and there's 
>>>>>>> no
>>>>>>> way to make things line up.
>>>>>>>
>>>>>>> What you're describing only makes sense if you're blocking against
>>>>>>> the final pane (since otherwise there's no reasonable way to match up
>>>>>>> somePC panes with the sink panes). There are multiple ways you can do 
>>>>>>> this:
>>>>>>> one would be to CGBK the two PCollections together, and trigger the new
>>>>>>> transform only on the final pane. Another would be to add a combiner 
>>>>>>> that
>>>>>>> returns a Void, triggering only on the final pane, and then make this
>>>>>>> singleton Void a side input. You could also do something explicit with 
>>>>>>> the
>>>>>>> state API.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <
>>>>>>> kirpic...@google.com> wrote:
>>>>>>>
>>>>>>>> So this appears not as easy as anticipated (surprise!)
>>>>>>>>
>>>>>>>> Suppose we have a PCollection "donePanes" with an element per
>>>>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane 
>>>>>>>> of
>>>>>>>> data has been written; this pane is: final / non-final".
>>>>>>>>
>>>>>>>> Suppose we want to use this to ensure that
>>>>>>>> somePc.apply(ParDo.of(fn)) happens only after the final pane has been
>>>>>>>> written.
>>>>>>>>
>>>>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to
>>>>>>>> happen when c emits a *final* pane.
>>>>>>>>
>>>>>>>> Unfortunately, using
>>>>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) 
>>>>>>>> doesn't do
>>>>>>>> the trick: the side input becomes ready the moment *the first *pane
>>>>>>>> of data has been written.
>>>>>>>>
>>>>>>>> But neither does
>>>>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(...filter only final
>>>>>>>> panes...).apply(View.asSingleton())). It also becomes ready the moment 
>>>>>>>> *the
>>>>>>>> first* pane has been written, you just get an exception if you
>>>>>>>> access the side input before the *final* pane was written.
>>>>>>>>
>>>>>>>> I can't think of a pure-Beam solution to this: either "donePanes"
>>>>>>>> will be used as a main input to something (and then everything else can
>>>>>>>> only be a side input, which is not general enough), or it will be used 
>>>>>>>> as a
>>>>>>>> side input (and then we can't achieve "trigger only after the final 
>>>>>>>> pane
>>>>>>>> fires").
>>>>>>>>
>>>>>>>> It seems that we need a way to control the side input pushback, and
>>>>>>>> configure whether a view becomes ready when its first pane has fired or
>>>>>>>> when its last pane has fired. I could see this be a property on the 
>>>>>>>> View
>>>>>>>> transform itself. In terms of implementation - I tried to figure out 
>>>>>>>> how
>>>>>>>> side input readiness is determined, in the direct runner and Dataflow
>>>>>>>> runner, and I'm completely lost and would appreciate some help.
>>>>>>>>
>>>>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This sounds great!
>>>>>>>>>
>>>>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> This would be absolutely great! It seems somewhat similar to the
>>>>>>>>>> changes that were made to the BigQuery sink to support WriteResult (
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
>>>>>>>>>> ).
>>>>>>>>>>
>>>>>>>>>> I find it helpful to think about the different things that may
>>>>>>>>>> come after a sink. For instance:
>>>>>>>>>>
>>>>>>>>>> 1. It might be helpful to have a collection of failing input
>>>>>>>>>> elements. The type of failed elements is pretty straightforward -- 
>>>>>>>>>> just the
>>>>>>>>>> input elements. This allows handling such failures by directing them
>>>>>>>>>> elsewhere or performing additional processing.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> BigQueryIO already does this as you point out.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2. For a sink that produces a series of files, it might be useful
>>>>>>>>>> to have a collection of the file names that have been completely 
>>>>>>>>>> written.
>>>>>>>>>> This allows performing additional handling on these completed 
>>>>>>>>>> segments.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In fact we already do this for FileBasedSinks.   See
>>>>>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 3. For a sink that updates some destination, it would be
>>>>>>>>>> reasonable to have a collection that provides (periodically) output
>>>>>>>>>> indicating how complete the information written to that destination 
>>>>>>>>>> is. For
>>>>>>>>>> instance, this might be something like "<this bigquery table> has 
>>>>>>>>>> all of
>>>>>>>>>> the elements up to <input watermark>" complete. This allows tracking 
>>>>>>>>>> how
>>>>>>>>>> much information has been completely written out.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Interesting. Maybe tough to do since sinks often don't have that
>>>>>>>>> knowledge.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think those concepts map to the more detailed description
>>>>>>>>>> Eugene provided, but I find it helpful to focus on what information 
>>>>>>>>>> comes
>>>>>>>>>> out of the sink and how it might be used.
>>>>>>>>>>
>>>>>>>>>> Were there any use cases the above miss? Any functionality that
>>>>>>>>>> has been described that doesn't map to these use cases?
>>>>>>>>>>
>>>>>>>>>> -- Ben
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <
>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> It makes sense to consider how this maps onto existing kinds of
>>>>>>>>>>> sinks.
>>>>>>>>>>>
>>>>>>>>>>> E.g.:
>>>>>>>>>>> - Something that just makes an RPC per record, e.g.
>>>>>>>>>>> MqttIO.write(): that will emit 1 result per bundle (either a bogus 
>>>>>>>>>>> value or
>>>>>>>>>>> number of records written) that will be Combine'd into 1 result per 
>>>>>>>>>>> pane of
>>>>>>>>>>> input. A user can sequence against this and be notified when some
>>>>>>>>>>> intermediate amount of data has been written for a window, or (via
>>>>>>>>>>> .isFinal()) when all of it has been written.
>>>>>>>>>>> - Something that e.g. initiates an import job, such as
>>>>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up 
>>>>>>>>>>> atomic
>>>>>>>>>>> index swap: should emit 1 result per import job, e.g. containing
>>>>>>>>>>> information about the job (e.g. its id and statistics). Role of 
>>>>>>>>>>> panes is
>>>>>>>>>>> the same.
>>>>>>>>>>> - Something like above but that supports dynamic destinations:
>>>>>>>>>>> like in WriteFiles, result will be PCollection<KV<DestinationT, 
>>>>>>>>>>> ResultT>>
>>>>>>>>>>> where ResultT may be something like a list of files that were 
>>>>>>>>>>> written for
>>>>>>>>>>> this pane of this destination.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <
>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I agree that the proper API for enabling the use case "do
>>>>>>>>>>>> something after the data has been written" is to return a 
>>>>>>>>>>>> PCollection of
>>>>>>>>>>>> objects where each object represents the result of writing some
>>>>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this
>>>>>>>>>>>> PCollection, in order to "do something after this subset has been 
>>>>>>>>>>>> written".
>>>>>>>>>>>>
>>>>>>>>>>>> The challenging part here is *identifying* the subset of the
>>>>>>>>>>>> data that's been written, in a way consistent with Beam's unified
>>>>>>>>>>>> batch/streaming model, where saying "all data has been written" is 
>>>>>>>>>>>> not an
>>>>>>>>>>>> option because more data can arrive.
>>>>>>>>>>>>
>>>>>>>>>>>> The next choice is "a window of input has been written", but
>>>>>>>>>>>> then again, late data can arrive into a window as well.
>>>>>>>>>>>>
>>>>>>>>>>>> Next choice after that is "a pane of input has been written",
>>>>>>>>>>>> but per https://s.apache.org/beam-sink-triggers the term "pane
>>>>>>>>>>>> of input" is moot: triggering and panes should be something 
>>>>>>>>>>>> private to the
>>>>>>>>>>>> sink, and the same input can trigger different sinks differently. 
>>>>>>>>>>>> The
>>>>>>>>>>>> hypothetical different accumulation modes make this trickier 
>>>>>>>>>>>> still. I'm not
>>>>>>>>>>>> sure whether we intend to also challenge the idea that windowing is
>>>>>>>>>>>> inherent to the collection, or whether it too should be specified 
>>>>>>>>>>>> on a
>>>>>>>>>>>> transform that processes the collection. I think for the sake of 
>>>>>>>>>>>> this
>>>>>>>>>>>> discussion we can assume that it's inherent, and assume the mental 
>>>>>>>>>>>> model
>>>>>>>>>>>> that the elements in different windows of a PCollection are 
>>>>>>>>>>>> processed
>>>>>>>>>>>> independently - "as if" there were multiple pipelines processing 
>>>>>>>>>>>> each
>>>>>>>>>>>> window.
>>>>>>>>>>>>
>>>>>>>>>>>> Overall, embracing the full picture, we end up with something
>>>>>>>>>>>> like this:
>>>>>>>>>>>> - The input PCollection is a composition of windows.
>>>>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or
>>>>>>>>>>>> sliding windows), the below applies to the entire contents of the
>>>>>>>>>>>> PCollection. If it's merging (e.g. session windows), then it 
>>>>>>>>>>>> applies
>>>>>>>>>>>> per-key, and the input should be (perhaps implicitly) keyed in a 
>>>>>>>>>>>> way that
>>>>>>>>>>>> the sink understands - for example, the grouping by destination in
>>>>>>>>>>>> DynamicDestinations in file and bigquery writes.
>>>>>>>>>>>> - Each window's contents is a "changelog" - stream of elements
>>>>>>>>>>>> and retractions.
>>>>>>>>>>>> - A "sink" processes each window of the collection, deciding
>>>>>>>>>>>> how to handle elements and retractions (and whether to support 
>>>>>>>>>>>> retractions
>>>>>>>>>>>> at all) in a sink-specific way, and deciding *when* to perform the 
>>>>>>>>>>>> side
>>>>>>>>>>>> effects for a portion of the changelog (a "pane") based on the 
>>>>>>>>>>>> sink's
>>>>>>>>>>>> triggering strategy.
>>>>>>>>>>>> - If the side effect itself is parallelized, then there'll be
>>>>>>>>>>>> multiple results for the pane - e.g. one per bundle.
>>>>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a
>>>>>>>>>>>> list of filenames that have been written, or simply a number of 
>>>>>>>>>>>> records
>>>>>>>>>>>> that was written, or a bogus void value etc. The result will 
>>>>>>>>>>>> implicitly
>>>>>>>>>>>> include the window of the input it's associated with. It will also
>>>>>>>>>>>> implicitly include pane information - index of the pane in this 
>>>>>>>>>>>> window, and
>>>>>>>>>>>> whether this is the first or last pane.
>>>>>>>>>>>> - The partitioning into bundles is an implementation detail and
>>>>>>>>>>>> not very useful, so before presenting the pane write results to 
>>>>>>>>>>>> the user,
>>>>>>>>>>>> the sink will probably want to Combine the bundle results so that 
>>>>>>>>>>>> there
>>>>>>>>>>>> ends up being 1 value for each pane that was written. Once again 
>>>>>>>>>>>> note that
>>>>>>>>>>>> panes may be associated with windows of the input as a whole, but 
>>>>>>>>>>>> if the
>>>>>>>>>>>> input is keyed (like with DynamicDestinations) they'll be 
>>>>>>>>>>>> associated with
>>>>>>>>>>>> per-key subsets of windows of the input.
>>>>>>>>>>>> - This combining requires an extra, well, combining operation,
>>>>>>>>>>>> so it should be optional.
>>>>>>>>>>>> - The user will end up getting either a PCollection<ResultT> or
>>>>>>>>>>>> a PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and 
>>>>>>>>>>>> ResultT, where
>>>>>>>>>>>> the elements of this collection will implicitly have window and 
>>>>>>>>>>>> pane
>>>>>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo.
>>>>>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace
>>>>>>>>>>>> the fact that trigger strategy is set on the input. But in that 
>>>>>>>>>>>> case the
>>>>>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not 
>>>>>>>>>>>> necessarily
>>>>>>>>>>>> directly related to panes of the input - the sink is allowed to do 
>>>>>>>>>>>> internal
>>>>>>>>>>>> aggregation as an implementation detail, which may modify the 
>>>>>>>>>>>> triggering
>>>>>>>>>>>> strategy. Basically the user will still get sink-assigned panes.
>>>>>>>>>>>> - In most cases, one may imagine that the user is interested in
>>>>>>>>>>>> being notified of "no more data associated with this window will be
>>>>>>>>>>>> written", so the user will ignore all ResultT's except those where 
>>>>>>>>>>>> the pane
>>>>>>>>>>>> is marked final. If a user is interested in being notified of 
>>>>>>>>>>>> intermediate
>>>>>>>>>>>> write results - they'll have to embrace the fact that they cannot 
>>>>>>>>>>>> identify
>>>>>>>>>>>> the precise subset of input associated with the intermediate 
>>>>>>>>>>>> result.
>>>>>>>>>>>>
>>>>>>>>>>>> I think the really key points of the above are:
>>>>>>>>>>>> - Sinks should support windowed input. Sinks should write
>>>>>>>>>>>> different windows of input independently. If the sink can write
>>>>>>>>>>>> multi-destination input, the destination should function as a 
>>>>>>>>>>>> grouping key,
>>>>>>>>>>>> and in that case merging windowing should be allowed.
>>>>>>>>>>>> - Producing a PCollection of write results should be optional.
>>>>>>>>>>>> - When asked to produce results, sinks produce a PCollection of
>>>>>>>>>>>> results that may be keyed or unkeyed (per above), and are placed 
>>>>>>>>>>>> in the
>>>>>>>>>>>> window of the input that was written, and have a PaneInfo assigned 
>>>>>>>>>>>> by the
>>>>>>>>>>>> sink, of which probably the only part useful to the user is 
>>>>>>>>>>>> whether it's
>>>>>>>>>>>> .isFinal().
>>>>>>>>>>>>
>>>>>>>>>>>> Does this sound reasonable?
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <
>>>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> +1
>>>>>>>>>>>>>
>>>>>>>>>>>>> At the very least an empty PCollection<?> could be produced
>>>>>>>>>>>>> with no
>>>>>>>>>>>>> promises about its contents but the ability to be followed
>>>>>>>>>>>>> (e.g. as a
>>>>>>>>>>>>> side input), which is forward compatible with whatever actual
>>>>>>>>>>>>> metadata
>>>>>>>>>>>>> one may decide to produce in the future.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <
>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>> > +dev@
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are
>>>>>>>>>>>>> easy to
>>>>>>>>>>>>> > understand and a good way for an IO to communicate and
>>>>>>>>>>>>> establish causal
>>>>>>>>>>>>> > dependencies. Converting an IO from PDone to real output may
>>>>>>>>>>>>> spur further
>>>>>>>>>>>>> > useful thoughts based on the design decisions about what
>>>>>>>>>>>>> sort of output is
>>>>>>>>>>>>> > most useful.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Kenn
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <
>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> I think all sinks actually do have valuable information to
>>>>>>>>>>>>> output which
>>>>>>>>>>>>> >> can be used after a write (file names,
>>>>>>>>>>>>> transaction/commit/row ids, table
>>>>>>>>>>>>> >> names, ...). In addition to this metadata, having a
>>>>>>>>>>>>> PCollection of all
>>>>>>>>>>>>> >> successful writes and all failed writes is useful for users
>>>>>>>>>>>>> so they can
>>>>>>>>>>>>> >> chain an action which depends on what was or wasn't
>>>>>>>>>>>>> successfully written.
>>>>>>>>>>>>> >> Users have requested adding retry/failure handling policies
>>>>>>>>>>>>> to sinks so that
>>>>>>>>>>>>> >> failed writes don't jam up the pipeline.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>>>>>>>>>>> chet.aldr...@postmates.com>
>>>>>>>>>>>>> >> wrote:
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> So I agree generally with the idea that returning a
>>>>>>>>>>>>> PCollection makes all
>>>>>>>>>>>>> >>> of this easier so that arbitrary additional functions can
>>>>>>>>>>>>> be added, what
>>>>>>>>>>>>> >>> exactly would write functions be returning in a
>>>>>>>>>>>>> PCollection that would make
>>>>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external
>>>>>>>>>>>>> source and now
>>>>>>>>>>>>> >>> the collection itself is no longer needed.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently
>>>>>>>>>>>>> that doesn’t
>>>>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible
>>>>>>>>>>>>> ways of handling
>>>>>>>>>>>>> >>> this given this conversation, and am curious which
>>>>>>>>>>>>> solution sounds like the
>>>>>>>>>>>>> >>> best way to deal with the problem:
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> 1. Have output transforms always return something specific
>>>>>>>>>>>>> (which would
>>>>>>>>>>>>> >>> be the same across transforms by convention), that is in
>>>>>>>>>>>>> the form of a
>>>>>>>>>>>>> >>> PCollection, so operations can occur after it.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a
>>>>>>>>>>>>> PCollection so we
>>>>>>>>>>>>> >>> can run applies afterward.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> 3. Make output transforms provide the facility for a
>>>>>>>>>>>>> callback function
>>>>>>>>>>>>> >>> which runs after the transform is complete.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> I went through these gymnastics recently when I was trying
>>>>>>>>>>>>> to build
>>>>>>>>>>>>> >>> something that would move indices after writing to
>>>>>>>>>>>>> Algolia, and the solution
>>>>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to
>>>>>>>>>>>>> exist in Beam. The
>>>>>>>>>>>>> >>> problem is that particular method requires the output
>>>>>>>>>>>>> transform in question
>>>>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t
>>>>>>>>>>>>> make sense to
>>>>>>>>>>>>> >>> return one. This seems like a bad solution, but
>>>>>>>>>>>>> unfortunately there isn’t a
>>>>>>>>>>>>> >>> notion of a transform that has no explicit output that
>>>>>>>>>>>>> needs to have
>>>>>>>>>>>>> >>> operations occur after it.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> The three potential solutions above address this issue,
>>>>>>>>>>>>> but I would like
>>>>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a
>>>>>>>>>>>>> different proposal
>>>>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on
>>>>>>>>>>>>> this, since it
>>>>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it
>>>>>>>>>>>>> really useful, for
>>>>>>>>>>>>> >>> one.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Chet
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection
>>>>>>>>>>>>> is returned
>>>>>>>>>>>>> >>> containing the result of the sink so that any arbitrary
>>>>>>>>>>>>> additional functions
>>>>>>>>>>>>> >>> can be applied.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>>>>>>>>>>> j...@nanthrax.net>
>>>>>>>>>>>>> >>> wrote:
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more
>>>>>>>>>>>>> than in the
>>>>>>>>>>>>> >>>> main.
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Regards
>>>>>>>>>>>>> >>>> JB
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> I do something almost exactly like this, but with
>>>>>>>>>>>>> BigtableIO instead.
>>>>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I
>>>>>>>>>>>>> need to finish this
>>>>>>>>>>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>>>>>>>>>>> something like
>>>>>>>>>>>>> >>>>> this.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on
>>>>>>>>>>>>> the output from
>>>>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function
>>>>>>>>>>>>> which will run when
>>>>>>>>>>>>> >>>>> all writes finish.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> You probably want to avoid doing something in the main
>>>>>>>>>>>>> method because
>>>>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the
>>>>>>>>>>>>> driver will die, get
>>>>>>>>>>>>> >>>>> killed, machine will explode, etc).
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <
>>>>>>>>>>>>> nerdyn...@gmail.com
>>>>>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>     Assuming you're in Java. You could just follow on in
>>>>>>>>>>>>> your Main
>>>>>>>>>>>>> >>>>> method.
>>>>>>>>>>>>> >>>>>     Checking the state of the Result.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>     Example:
>>>>>>>>>>>>> >>>>>     PipelineResult result = pipeline.run();
>>>>>>>>>>>>> >>>>>     try {
>>>>>>>>>>>>> >>>>>     result.waitUntilFinish();
>>>>>>>>>>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>>>>>>>>> >>>>>     //DO ES work
>>>>>>>>>>>>> >>>>>     }
>>>>>>>>>>>>> >>>>>     } catch(Exception e) {
>>>>>>>>>>>>> >>>>>     result.cancel();
>>>>>>>>>>>>> >>>>>     throw e;
>>>>>>>>>>>>> >>>>>     }
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>     Otherwise you could also use Oozie to construct a
>>>>>>>>>>>>> work flow.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>>>>>>>>>> >>>>> <j...@nanthrax.net
>>>>>>>>>>>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>         Hi,
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>         yes, we had a similar question some days ago.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>         We can imagine to have a user callback fn fired
>>>>>>>>>>>>> when the sink
>>>>>>>>>>>>> >>>>> batch is
>>>>>>>>>>>>> >>>>>         complete.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>         Let me think about that.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>         Regards
>>>>>>>>>>>>> >>>>>         JB
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>             Hey JB,
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>             Thanks for getting back so quickly.
>>>>>>>>>>>>> >>>>>             I suppose in that case I would need a way of
>>>>>>>>>>>>> monitoring
>>>>>>>>>>>>> >>>>> when the ES
>>>>>>>>>>>>> >>>>>             transform completes successfully before I
>>>>>>>>>>>>> can proceed with
>>>>>>>>>>>>> >>>>> doing the
>>>>>>>>>>>>> >>>>>             swap.
>>>>>>>>>>>>> >>>>>             The problem with this is that I can't think
>>>>>>>>>>>>> of a good way
>>>>>>>>>>>>> >>>>> to
>>>>>>>>>>>>> >>>>>             determine that termination state short of
>>>>>>>>>>>>> polling the new
>>>>>>>>>>>>> >>>>> index to
>>>>>>>>>>>>> >>>>>             check the document count compared to the
>>>>>>>>>>>>> size of input
>>>>>>>>>>>>> >>>>> PCollection.
>>>>>>>>>>>>> >>>>>             That, or maybe I'd need to use an external
>>>>>>>>>>>>> system like you
>>>>>>>>>>>>> >>>>> mentioned
>>>>>>>>>>>>> >>>>>             to poll on the state of the pipeline (I'm
>>>>>>>>>>>>> using Google
>>>>>>>>>>>>> >>>>> Dataflow, so
>>>>>>>>>>>>> >>>>>             maybe there's a way to do this with some
>>>>>>>>>>>>> API).
>>>>>>>>>>>>> >>>>>             But I would have thought that there would be
>>>>>>>>>>>>> an easy way of
>>>>>>>>>>>>> >>>>> simply
>>>>>>>>>>>>> >>>>>             saying "do not process this transform until
>>>>>>>>>>>>> this other
>>>>>>>>>>>>> >>>>> transform
>>>>>>>>>>>>> >>>>>             completes".
>>>>>>>>>>>>> >>>>>             Is there no established way of "signaling"
>>>>>>>>>>>>> between
>>>>>>>>>>>>> >>>>> pipelines when
>>>>>>>>>>>>> >>>>>             some pipeline completes, or have some way of
>>>>>>>>>>>>> declaring a
>>>>>>>>>>>>> >>>>> dependency
>>>>>>>>>>>>> >>>>>             of 1 transform on another transform?
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>             Thanks again,
>>>>>>>>>>>>> >>>>>             Philip
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM,
>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                  Hi Philip,
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                  You won't be able to do (3) in the same
>>>>>>>>>>>>> pipeline as
>>>>>>>>>>>>> >>>>> the
>>>>>>>>>>>>> >>>>>             Elasticsearch Sink
>>>>>>>>>>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                  So, (3) has to be done in another
>>>>>>>>>>>>> pipeline (using a
>>>>>>>>>>>>> >>>>> DoFn) or in
>>>>>>>>>>>>> >>>>>             another
>>>>>>>>>>>>> >>>>>                  "system" (like Camel for instance). I
>>>>>>>>>>>>> would do a check
>>>>>>>>>>>>> >>>>> of the
>>>>>>>>>>>>> >>>>>             data in the
>>>>>>>>>>>>> >>>>>                  index and then trigger the swap there.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                  Regards
>>>>>>>>>>>>> >>>>>                  JB
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                      Hi,
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                      I'm pretty new to Beam, and I've
>>>>>>>>>>>>> been trying to
>>>>>>>>>>>>> >>>>> use the
>>>>>>>>>>>>> >>>>>             ElasticSearchIO
>>>>>>>>>>>>> >>>>>                      sink to write docs into ES.
>>>>>>>>>>>>> >>>>>                      With this, I want to be able to
>>>>>>>>>>>>> >>>>>                      1. ingest and transform rows from
>>>>>>>>>>>>> DB (done)
>>>>>>>>>>>>> >>>>>                      2. write JSON docs/strings into a
>>>>>>>>>>>>> new ES index
>>>>>>>>>>>>> >>>>> (done)
>>>>>>>>>>>>> >>>>>                      3. After (2) is complete and all
>>>>>>>>>>>>> documents are
>>>>>>>>>>>>> >>>>> written into
>>>>>>>>>>>>> >>>>>             a new index,
>>>>>>>>>>>>> >>>>>                      trigger an atomic index swap under
>>>>>>>>>>>>> an alias to
>>>>>>>>>>>>> >>>>> replace the
>>>>>>>>>>>>> >>>>>             current
>>>>>>>>>>>>> >>>>>                      aliased index with the new index
>>>>>>>>>>>>> generated in step
>>>>>>>>>>>>> >>>>> 2. This
>>>>>>>>>>>>> >>>>>             is basically
>>>>>>>>>>>>> >>>>>                      a single POST request to the ES
>>>>>>>>>>>>> cluster.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                      The problem I'm facing is that I
>>>>>>>>>>>>> don't seem to be
>>>>>>>>>>>>> >>>>> able to
>>>>>>>>>>>>> >>>>>             find a way to
>>>>>>>>>>>>> >>>>>                      have a way for (3) to happen after
>>>>>>>>>>>>> step (2) is
>>>>>>>>>>>>> >>>>> complete.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                      The ElasticSearchIO.Write transform
>>>>>>>>>>>>> returns a
>>>>>>>>>>>>> >>>>> PDone, and
>>>>>>>>>>>>> >>>>>             I'm not sure
>>>>>>>>>>>>> >>>>>                      how to proceed from there because
>>>>>>>>>>>>> it doesn't seem
>>>>>>>>>>>>> >>>>> to let me
>>>>>>>>>>>>> >>>>>             do another
>>>>>>>>>>>>> >>>>>                      apply on it to "define" a
>>>>>>>>>>>>> dependency.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> <
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> <
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> <
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> <
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> <
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> <
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> <
>>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                      Is there a recommended way to
>>>>>>>>>>>>> construct pipelines
>>>>>>>>>>>>> >>>>> workflows
>>>>>>>>>>>>> >>>>>             like this?
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                      Thanks in advance,
>>>>>>>>>>>>> >>>>>                      Philip
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>>>>>>>>>>> >>>>>             jbono...@apache.org <mailto:
>>>>>>>>>>>>> jbono...@apache.org>
>>>>>>>>>>>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>>>>>>>>>>>> jbono...@apache.org>>
>>>>>>>>>>>>> >>>>>             http://blog.nanthrax.net
>>>>>>>>>>>>> >>>>>                  Talend - http://www.talend.com
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>         --         Jean-Baptiste Onofré
>>>>>>>>>>>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>>>>>>>> >>>>>         http://blog.nanthrax.net
>>>>>>>>>>>>> >>>>>         Talend - http://www.talend.com
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>>>>>>>>>>> >>>>>     ----------------------------------------------------
>>>>>>>>>>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>>>>>>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>>>>>>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> --
>>>>>>>>>>>>> >>>> Jean-Baptiste Onofré
>>>>>>>>>>>>> >>>> jbono...@apache.org
>>>>>>>>>>>>> >>>> http://blog.nanthrax.net
>>>>>>>>>>>>> >>>> Talend - http://www.talend.com
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>
>>>
>

Reply via email to