PR is out https://github.com/apache/beam/pull/4301

This should allow us to have useful sequencing for sinks like BigtableIO /
BigQueryIO.

Adding a couple of interested parties:
- Steve, would you be interested in using this in
https://github.com/apache/beam/pull/3997 ?
- Mairbek: this should help in https://github.com/apache/beam/pull/4264 -
in particular, this works properly in case the input can be firing multiple
times.

On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> I figured out the Never.ever() approach and it seems to work. Will finish
> this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this
> will be quite a useful transform.
>
> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> I'm a bit confused by all of these suggestions: they sound plausible at a
>> high level, but I'm having a hard time making any one of them concrete.
>>
>> So suppose we want to create a transform Wait.on(PCollection<?> signal):
>> PCollection<T> -> PCollection<T>.
>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
>> "a", but buffers panes of "a" in any given window until the final pane of
>> "sig" in the same window is fired (or, if it's never fired, until the
>> window closes? could use a deadletter for that maybe).
>>
>> This transform I suppose would need to have a keyed and unkeyed version.
>>
>> The keyed version would support merging window fns, and would require "a"
>> and "sig" to be keyed by the same key, and would work using a CoGbk -
>> followed by a stateful ParDo? Or is there a way to get away without a
>> stateful ParDo here? (not all runners support it)
>>
>> The unkeyed version would not support merging window fns. Reuven, can you
>> elaborate how your combiner idea would work here - in particular, what do
>> you mean by "triggering only on the final pane"? Do you mean filter
>> non-final panes before entering the combiner? I wonder if that'll work,
>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on the
>> side input with a Never.ever() trigger"?
>>
>> Thanks.
>>
>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote:
>>
>>> This is an interesting point.
>>>
>>> In the past, we've often just though about sequencing some action to
>>> take place after the sink, in which case you can simply use the sink output
>>> as a main input. However if you want to run a transform with another
>>> PCollection as a main input, this doesn't work. And as you've discovered,
>>> triggered side inputs are defined to be non-deterministic, and there's no
>>> way to make things line up.
>>>
>>> What you're describing only makes sense if you're blocking against the
>>> final pane (since otherwise there's no reasonable way to match up somePC
>>> panes with the sink panes). There are multiple ways you can do this: one
>>> would be to CGBK the two PCollections together, and trigger the new
>>> transform only on the final pane. Another would be to add a combiner that
>>> returns a Void, triggering only on the final pane, and then make this
>>> singleton Void a side input. You could also do something explicit with the
>>> state API.
>>>
>>> Reuven
>>>
>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> So this appears not as easy as anticipated (surprise!)
>>>>
>>>> Suppose we have a PCollection "donePanes" with an element per
>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>>>> data has been written; this pane is: final / non-final".
>>>>
>>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>>>> happens only after the final pane has been written.
>>>>
>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
>>>> when c emits a *final* pane.
>>>>
>>>> Unfortunately, using
>>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
>>>> the trick: the side input becomes ready the moment *the first *pane of
>>>> data has been written.
>>>>
>>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>>>> only final panes...).apply(View.asSingleton())). It also becomes ready the
>>>> moment *the first* pane has been written, you just get an exception if
>>>> you access the side input before the *final* pane was written.
>>>>
>>>> I can't think of a pure-Beam solution to this: either "donePanes" will
>>>> be used as a main input to something (and then everything else can only be
>>>> a side input, which is not general enough), or it will be used as a side
>>>> input (and then we can't achieve "trigger only after the final pane 
>>>> fires").
>>>>
>>>> It seems that we need a way to control the side input pushback, and
>>>> configure whether a view becomes ready when its first pane has fired or
>>>> when its last pane has fired. I could see this be a property on the View
>>>> transform itself. In terms of implementation - I tried to figure out how
>>>> side input readiness is determined, in the direct runner and Dataflow
>>>> runner, and I'm completely lost and would appreciate some help.
>>>>
>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> This sounds great!
>>>>>
>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> This would be absolutely great! It seems somewhat similar to the
>>>>>> changes that were made to the BigQuery sink to support WriteResult (
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
>>>>>> ).
>>>>>>
>>>>>> I find it helpful to think about the different things that may come
>>>>>> after a sink. For instance:
>>>>>>
>>>>>> 1. It might be helpful to have a collection of failing input
>>>>>> elements. The type of failed elements is pretty straightforward -- just 
>>>>>> the
>>>>>> input elements. This allows handling such failures by directing them
>>>>>> elsewhere or performing additional processing.
>>>>>>
>>>>>
>>>>> BigQueryIO already does this as you point out.
>>>>>
>>>>>>
>>>>>> 2. For a sink that produces a series of files, it might be useful to
>>>>>> have a collection of the file names that have been completely written. 
>>>>>> This
>>>>>> allows performing additional handling on these completed segments.
>>>>>>
>>>>>
>>>>> In fact we already do this for FileBasedSinks.   See
>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java
>>>>>
>>>>>>
>>>>>> 3. For a sink that updates some destination, it would be reasonable
>>>>>> to have a collection that provides (periodically) output indicating how
>>>>>> complete the information written to that destination is. For instance, 
>>>>>> this
>>>>>> might be something like "<this bigquery table> has all of the elements up
>>>>>> to <input watermark>" complete. This allows tracking how much information
>>>>>> has been completely written out.
>>>>>>
>>>>>
>>>>> Interesting. Maybe tough to do since sinks often don't have that
>>>>> knowledge.
>>>>>
>>>>>
>>>>>>
>>>>>> I think those concepts map to the more detailed description Eugene
>>>>>> provided, but I find it helpful to focus on what information comes out of
>>>>>> the sink and how it might be used.
>>>>>>
>>>>>> Were there any use cases the above miss? Any functionality that has
>>>>>> been described that doesn't map to these use cases?
>>>>>>
>>>>>> -- Ben
>>>>>>
>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> It makes sense to consider how this maps onto existing kinds of
>>>>>>> sinks.
>>>>>>>
>>>>>>> E.g.:
>>>>>>> - Something that just makes an RPC per record, e.g. MqttIO.write():
>>>>>>> that will emit 1 result per bundle (either a bogus value or number of
>>>>>>> records written) that will be Combine'd into 1 result per pane of 
>>>>>>> input. A
>>>>>>> user can sequence against this and be notified when some intermediate
>>>>>>> amount of data has been written for a window, or (via .isFinal()) when 
>>>>>>> all
>>>>>>> of it has been written.
>>>>>>> - Something that e.g. initiates an import job, such as
>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic
>>>>>>> index swap: should emit 1 result per import job, e.g. containing
>>>>>>> information about the job (e.g. its id and statistics). Role of panes is
>>>>>>> the same.
>>>>>>> - Something like above but that supports dynamic destinations: like
>>>>>>> in WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> 
>>>>>>> where
>>>>>>> ResultT may be something like a list of files that were written for this
>>>>>>> pane of this destination.
>>>>>>>
>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <
>>>>>>> kirpic...@google.com> wrote:
>>>>>>>
>>>>>>>> I agree that the proper API for enabling the use case "do something
>>>>>>>> after the data has been written" is to return a PCollection of objects
>>>>>>>> where each object represents the result of writing some identifiable 
>>>>>>>> subset
>>>>>>>> of the data. Then one can apply a ParDo to this PCollection, in order 
>>>>>>>> to
>>>>>>>> "do something after this subset has been written".
>>>>>>>>
>>>>>>>> The challenging part here is *identifying* the subset of the data
>>>>>>>> that's been written, in a way consistent with Beam's unified
>>>>>>>> batch/streaming model, where saying "all data has been written" is not 
>>>>>>>> an
>>>>>>>> option because more data can arrive.
>>>>>>>>
>>>>>>>> The next choice is "a window of input has been written", but then
>>>>>>>> again, late data can arrive into a window as well.
>>>>>>>>
>>>>>>>> Next choice after that is "a pane of input has been written", but
>>>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of
>>>>>>>> input" is moot: triggering and panes should be something private to the
>>>>>>>> sink, and the same input can trigger different sinks differently. The
>>>>>>>> hypothetical different accumulation modes make this trickier still. 
>>>>>>>> I'm not
>>>>>>>> sure whether we intend to also challenge the idea that windowing is
>>>>>>>> inherent to the collection, or whether it too should be specified on a
>>>>>>>> transform that processes the collection. I think for the sake of this
>>>>>>>> discussion we can assume that it's inherent, and assume the mental 
>>>>>>>> model
>>>>>>>> that the elements in different windows of a PCollection are processed
>>>>>>>> independently - "as if" there were multiple pipelines processing each
>>>>>>>> window.
>>>>>>>>
>>>>>>>> Overall, embracing the full picture, we end up with something like
>>>>>>>> this:
>>>>>>>> - The input PCollection is a composition of windows.
>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>>>>>>>> windows), the below applies to the entire contents of the PCollection. 
>>>>>>>> If
>>>>>>>> it's merging (e.g. session windows), then it applies per-key, and the 
>>>>>>>> input
>>>>>>>> should be (perhaps implicitly) keyed in a way that the sink 
>>>>>>>> understands -
>>>>>>>> for example, the grouping by destination in DynamicDestinations in 
>>>>>>>> file and
>>>>>>>> bigquery writes.
>>>>>>>> - Each window's contents is a "changelog" - stream of elements and
>>>>>>>> retractions.
>>>>>>>> - A "sink" processes each window of the collection, deciding how to
>>>>>>>> handle elements and retractions (and whether to support retractions at 
>>>>>>>> all)
>>>>>>>> in a sink-specific way, and deciding *when* to perform the side 
>>>>>>>> effects for
>>>>>>>> a portion of the changelog (a "pane") based on the sink's triggering
>>>>>>>> strategy.
>>>>>>>> - If the side effect itself is parallelized, then there'll be
>>>>>>>> multiple results for the pane - e.g. one per bundle.
>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list of
>>>>>>>> filenames that have been written, or simply a number of records that 
>>>>>>>> was
>>>>>>>> written, or a bogus void value etc. The result will implicitly include 
>>>>>>>> the
>>>>>>>> window of the input it's associated with. It will also implicitly 
>>>>>>>> include
>>>>>>>> pane information - index of the pane in this window, and whether this 
>>>>>>>> is
>>>>>>>> the first or last pane.
>>>>>>>> - The partitioning into bundles is an implementation detail and not
>>>>>>>> very useful, so before presenting the pane write results to the user, 
>>>>>>>> the
>>>>>>>> sink will probably want to Combine the bundle results so that there 
>>>>>>>> ends up
>>>>>>>> being 1 value for each pane that was written. Once again note that 
>>>>>>>> panes
>>>>>>>> may be associated with windows of the input as a whole, but if the 
>>>>>>>> input is
>>>>>>>> keyed (like with DynamicDestinations) they'll be associated with 
>>>>>>>> per-key
>>>>>>>> subsets of windows of the input.
>>>>>>>> - This combining requires an extra, well, combining operation, so
>>>>>>>> it should be optional.
>>>>>>>> - The user will end up getting either a PCollection<ResultT> or a
>>>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, 
>>>>>>>> where
>>>>>>>> the elements of this collection will implicitly have window and pane
>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo.
>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace the
>>>>>>>> fact that trigger strategy is set on the input. But in that case the 
>>>>>>>> user
>>>>>>>> will have to accept that the PaneInfo of ResultT's is not necessarily
>>>>>>>> directly related to panes of the input - the sink is allowed to do 
>>>>>>>> internal
>>>>>>>> aggregation as an implementation detail, which may modify the 
>>>>>>>> triggering
>>>>>>>> strategy. Basically the user will still get sink-assigned panes.
>>>>>>>> - In most cases, one may imagine that the user is interested in
>>>>>>>> being notified of "no more data associated with this window will be
>>>>>>>> written", so the user will ignore all ResultT's except those where the 
>>>>>>>> pane
>>>>>>>> is marked final. If a user is interested in being notified of 
>>>>>>>> intermediate
>>>>>>>> write results - they'll have to embrace the fact that they cannot 
>>>>>>>> identify
>>>>>>>> the precise subset of input associated with the intermediate result.
>>>>>>>>
>>>>>>>> I think the really key points of the above are:
>>>>>>>> - Sinks should support windowed input. Sinks should write different
>>>>>>>> windows of input independently. If the sink can write multi-destination
>>>>>>>> input, the destination should function as a grouping key, and in that 
>>>>>>>> case
>>>>>>>> merging windowing should be allowed.
>>>>>>>> - Producing a PCollection of write results should be optional.
>>>>>>>> - When asked to produce results, sinks produce a PCollection of
>>>>>>>> results that may be keyed or unkeyed (per above), and are placed in the
>>>>>>>> window of the input that was written, and have a PaneInfo assigned by 
>>>>>>>> the
>>>>>>>> sink, of which probably the only part useful to the user is whether 
>>>>>>>> it's
>>>>>>>> .isFinal().
>>>>>>>>
>>>>>>>> Does this sound reasonable?
>>>>>>>>
>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <
>>>>>>>> rober...@google.com> wrote:
>>>>>>>>
>>>>>>>>> +1
>>>>>>>>>
>>>>>>>>> At the very least an empty PCollection<?> could be produced with no
>>>>>>>>> promises about its contents but the ability to be followed (e.g.
>>>>>>>>> as a
>>>>>>>>> side input), which is forward compatible with whatever actual
>>>>>>>>> metadata
>>>>>>>>> one may decide to produce in the future.
>>>>>>>>>
>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com>
>>>>>>>>> wrote:
>>>>>>>>> > +dev@
>>>>>>>>> >
>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are easy
>>>>>>>>> to
>>>>>>>>> > understand and a good way for an IO to communicate and establish
>>>>>>>>> causal
>>>>>>>>> > dependencies. Converting an IO from PDone to real output may
>>>>>>>>> spur further
>>>>>>>>> > useful thoughts based on the design decisions about what sort of
>>>>>>>>> output is
>>>>>>>>> > most useful.
>>>>>>>>> >
>>>>>>>>> > Kenn
>>>>>>>>> >
>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>> >>
>>>>>>>>> >> I think all sinks actually do have valuable information to
>>>>>>>>> output which
>>>>>>>>> >> can be used after a write (file names, transaction/commit/row
>>>>>>>>> ids, table
>>>>>>>>> >> names, ...). In addition to this metadata, having a PCollection
>>>>>>>>> of all
>>>>>>>>> >> successful writes and all failed writes is useful for users so
>>>>>>>>> they can
>>>>>>>>> >> chain an action which depends on what was or wasn't
>>>>>>>>> successfully written.
>>>>>>>>> >> Users have requested adding retry/failure handling policies to
>>>>>>>>> sinks so that
>>>>>>>>> >> failed writes don't jam up the pipeline.
>>>>>>>>> >>
>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>>>>>>> chet.aldr...@postmates.com>
>>>>>>>>> >> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> So I agree generally with the idea that returning a
>>>>>>>>> PCollection makes all
>>>>>>>>> >>> of this easier so that arbitrary additional functions can be
>>>>>>>>> added, what
>>>>>>>>> >>> exactly would write functions be returning in a PCollection
>>>>>>>>> that would make
>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external
>>>>>>>>> source and now
>>>>>>>>> >>> the collection itself is no longer needed.
>>>>>>>>> >>>
>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently that
>>>>>>>>> doesn’t
>>>>>>>>> >>> allow any work to occur after it. I see a couple possible ways
>>>>>>>>> of handling
>>>>>>>>> >>> this given this conversation, and am curious which solution
>>>>>>>>> sounds like the
>>>>>>>>> >>> best way to deal with the problem:
>>>>>>>>> >>>
>>>>>>>>> >>> 1. Have output transforms always return something specific
>>>>>>>>> (which would
>>>>>>>>> >>> be the same across transforms by convention), that is in the
>>>>>>>>> form of a
>>>>>>>>> >>> PCollection, so operations can occur after it.
>>>>>>>>> >>>
>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a
>>>>>>>>> PCollection so we
>>>>>>>>> >>> can run applies afterward.
>>>>>>>>> >>>
>>>>>>>>> >>> 3. Make output transforms provide the facility for a callback
>>>>>>>>> function
>>>>>>>>> >>> which runs after the transform is complete.
>>>>>>>>> >>>
>>>>>>>>> >>> I went through these gymnastics recently when I was trying to
>>>>>>>>> build
>>>>>>>>> >>> something that would move indices after writing to Algolia,
>>>>>>>>> and the solution
>>>>>>>>> >>> was to co-opt code from the old Sink class that used to exist
>>>>>>>>> in Beam. The
>>>>>>>>> >>> problem is that particular method requires the output
>>>>>>>>> transform in question
>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t make
>>>>>>>>> sense to
>>>>>>>>> >>> return one. This seems like a bad solution, but unfortunately
>>>>>>>>> there isn’t a
>>>>>>>>> >>> notion of a transform that has no explicit output that needs
>>>>>>>>> to have
>>>>>>>>> >>> operations occur after it.
>>>>>>>>> >>>
>>>>>>>>> >>> The three potential solutions above address this issue, but I
>>>>>>>>> would like
>>>>>>>>> >>> to hear on which would be preferable (or perhaps a different
>>>>>>>>> proposal
>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on this,
>>>>>>>>> since it
>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it
>>>>>>>>> really useful, for
>>>>>>>>> >>> one.
>>>>>>>>> >>>
>>>>>>>>> >>> Chet
>>>>>>>>> >>>
>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection is
>>>>>>>>> returned
>>>>>>>>> >>> containing the result of the sink so that any arbitrary
>>>>>>>>> additional functions
>>>>>>>>> >>> can be applied.
>>>>>>>>> >>>
>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>>>>>>> j...@nanthrax.net>
>>>>>>>>> >>> wrote:
>>>>>>>>> >>>>
>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more than
>>>>>>>>> in the
>>>>>>>>> >>>> main.
>>>>>>>>> >>>>
>>>>>>>>> >>>> Regards
>>>>>>>>> >>>> JB
>>>>>>>>> >>>>
>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> I do something almost exactly like this, but with BigtableIO
>>>>>>>>> instead.
>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I need
>>>>>>>>> to finish this
>>>>>>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>>>>>>> something like
>>>>>>>>> >>>>> this.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the
>>>>>>>>> output from
>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function which
>>>>>>>>> will run when
>>>>>>>>> >>>>> all writes finish.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> You probably want to avoid doing something in the main
>>>>>>>>> method because
>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver
>>>>>>>>> will die, get
>>>>>>>>> >>>>> killed, machine will explode, etc).
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <
>>>>>>>>> nerdyn...@gmail.com
>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>     Assuming you're in Java. You could just follow on in
>>>>>>>>> your Main
>>>>>>>>> >>>>> method.
>>>>>>>>> >>>>>     Checking the state of the Result.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>     Example:
>>>>>>>>> >>>>>     PipelineResult result = pipeline.run();
>>>>>>>>> >>>>>     try {
>>>>>>>>> >>>>>     result.waitUntilFinish();
>>>>>>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>>>>> >>>>>     //DO ES work
>>>>>>>>> >>>>>     }
>>>>>>>>> >>>>>     } catch(Exception e) {
>>>>>>>>> >>>>>     result.cancel();
>>>>>>>>> >>>>>     throw e;
>>>>>>>>> >>>>>     }
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>     Otherwise you could also use Oozie to construct a work
>>>>>>>>> flow.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>>>>>> >>>>> <j...@nanthrax.net
>>>>>>>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>         Hi,
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>         yes, we had a similar question some days ago.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>         We can imagine to have a user callback fn fired when
>>>>>>>>> the sink
>>>>>>>>> >>>>> batch is
>>>>>>>>> >>>>>         complete.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>         Let me think about that.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>         Regards
>>>>>>>>> >>>>>         JB
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>             Hey JB,
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>             Thanks for getting back so quickly.
>>>>>>>>> >>>>>             I suppose in that case I would need a way of
>>>>>>>>> monitoring
>>>>>>>>> >>>>> when the ES
>>>>>>>>> >>>>>             transform completes successfully before I can
>>>>>>>>> proceed with
>>>>>>>>> >>>>> doing the
>>>>>>>>> >>>>>             swap.
>>>>>>>>> >>>>>             The problem with this is that I can't think of a
>>>>>>>>> good way
>>>>>>>>> >>>>> to
>>>>>>>>> >>>>>             determine that termination state short of
>>>>>>>>> polling the new
>>>>>>>>> >>>>> index to
>>>>>>>>> >>>>>             check the document count compared to the size of
>>>>>>>>> input
>>>>>>>>> >>>>> PCollection.
>>>>>>>>> >>>>>             That, or maybe I'd need to use an external
>>>>>>>>> system like you
>>>>>>>>> >>>>> mentioned
>>>>>>>>> >>>>>             to poll on the state of the pipeline (I'm using
>>>>>>>>> Google
>>>>>>>>> >>>>> Dataflow, so
>>>>>>>>> >>>>>             maybe there's a way to do this with some API).
>>>>>>>>> >>>>>             But I would have thought that there would be an
>>>>>>>>> easy way of
>>>>>>>>> >>>>> simply
>>>>>>>>> >>>>>             saying "do not process this transform until this
>>>>>>>>> other
>>>>>>>>> >>>>> transform
>>>>>>>>> >>>>>             completes".
>>>>>>>>> >>>>>             Is there no established way of "signaling"
>>>>>>>>> between
>>>>>>>>> >>>>> pipelines when
>>>>>>>>> >>>>>             some pipeline completes, or have some way of
>>>>>>>>> declaring a
>>>>>>>>> >>>>> dependency
>>>>>>>>> >>>>>             of 1 transform on another transform?
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>             Thanks again,
>>>>>>>>> >>>>>             Philip
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste
>>>>>>>>> Onofré
>>>>>>>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>>>>>>> >>>>> <mailto:j...@nanthrax.net
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                  Hi Philip,
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                  You won't be able to do (3) in the same
>>>>>>>>> pipeline as
>>>>>>>>> >>>>> the
>>>>>>>>> >>>>>             Elasticsearch Sink
>>>>>>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                  So, (3) has to be done in another pipeline
>>>>>>>>> (using a
>>>>>>>>> >>>>> DoFn) or in
>>>>>>>>> >>>>>             another
>>>>>>>>> >>>>>                  "system" (like Camel for instance). I would
>>>>>>>>> do a check
>>>>>>>>> >>>>> of the
>>>>>>>>> >>>>>             data in the
>>>>>>>>> >>>>>                  index and then trigger the swap there.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                  Regards
>>>>>>>>> >>>>>                  JB
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                      Hi,
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                      I'm pretty new to Beam, and I've been
>>>>>>>>> trying to
>>>>>>>>> >>>>> use the
>>>>>>>>> >>>>>             ElasticSearchIO
>>>>>>>>> >>>>>                      sink to write docs into ES.
>>>>>>>>> >>>>>                      With this, I want to be able to
>>>>>>>>> >>>>>                      1. ingest and transform rows from DB
>>>>>>>>> (done)
>>>>>>>>> >>>>>                      2. write JSON docs/strings into a new
>>>>>>>>> ES index
>>>>>>>>> >>>>> (done)
>>>>>>>>> >>>>>                      3. After (2) is complete and all
>>>>>>>>> documents are
>>>>>>>>> >>>>> written into
>>>>>>>>> >>>>>             a new index,
>>>>>>>>> >>>>>                      trigger an atomic index swap under an
>>>>>>>>> alias to
>>>>>>>>> >>>>> replace the
>>>>>>>>> >>>>>             current
>>>>>>>>> >>>>>                      aliased index with the new index
>>>>>>>>> generated in step
>>>>>>>>> >>>>> 2. This
>>>>>>>>> >>>>>             is basically
>>>>>>>>> >>>>>                      a single POST request to the ES cluster.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                      The problem I'm facing is that I don't
>>>>>>>>> seem to be
>>>>>>>>> >>>>> able to
>>>>>>>>> >>>>>             find a way to
>>>>>>>>> >>>>>                      have a way for (3) to happen after step
>>>>>>>>> (2) is
>>>>>>>>> >>>>> complete.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                      The ElasticSearchIO.Write transform
>>>>>>>>> returns a
>>>>>>>>> >>>>> PDone, and
>>>>>>>>> >>>>>             I'm not sure
>>>>>>>>> >>>>>                      how to proceed from there because it
>>>>>>>>> doesn't seem
>>>>>>>>> >>>>> to let me
>>>>>>>>> >>>>>             do another
>>>>>>>>> >>>>>                      apply on it to "define" a dependency.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> <
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> <
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> <
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> <
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> <
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> <
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> <
>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>>> >>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                      Is there a recommended way to construct
>>>>>>>>> pipelines
>>>>>>>>> >>>>> workflows
>>>>>>>>> >>>>>             like this?
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                      Thanks in advance,
>>>>>>>>> >>>>>                      Philip
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>>>>>>> >>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>>>>>>>> jbono...@apache.org>>
>>>>>>>>> >>>>>             http://blog.nanthrax.net
>>>>>>>>> >>>>>                  Talend - http://www.talend.com
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>         --         Jean-Baptiste Onofré
>>>>>>>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>>>> >>>>>         http://blog.nanthrax.net
>>>>>>>>> >>>>>         Talend - http://www.talend.com
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>>>>>>> >>>>>     ----------------------------------------------------
>>>>>>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>> --
>>>>>>>>> >>>> Jean-Baptiste Onofré
>>>>>>>>> >>>> jbono...@apache.org
>>>>>>>>> >>>> http://blog.nanthrax.net
>>>>>>>>> >>>> Talend - http://www.talend.com
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>
>>>

Reply via email to