I figured out the Never.ever() approach and it seems to work. Will finish
this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this
will be quite a useful transform.

On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> I'm a bit confused by all of these suggestions: they sound plausible at a
> high level, but I'm having a hard time making any one of them concrete.
>
> So suppose we want to create a transform Wait.on(PCollection<?> signal):
> PCollection<T> -> PCollection<T>.
> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
> "a", but buffers panes of "a" in any given window until the final pane of
> "sig" in the same window is fired (or, if it's never fired, until the
> window closes? could use a deadletter for that maybe).
>
> This transform I suppose would need to have a keyed and unkeyed version.
>
> The keyed version would support merging window fns, and would require "a"
> and "sig" to be keyed by the same key, and would work using a CoGbk -
> followed by a stateful ParDo? Or is there a way to get away without a
> stateful ParDo here? (not all runners support it)
>
> The unkeyed version would not support merging window fns. Reuven, can you
> elaborate how your combiner idea would work here - in particular, what do
> you mean by "triggering only on the final pane"? Do you mean filter
> non-final panes before entering the combiner? I wonder if that'll work,
> probably worth a shot. And Kenn, can you elaborate on "re-trigger on the
> side input with a Never.ever() trigger"?
>
> Thanks.
>
> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote:
>
>> This is an interesting point.
>>
>> In the past, we've often just though about sequencing some action to take
>> place after the sink, in which case you can simply use the sink output as a
>> main input. However if you want to run a transform with another PCollection
>> as a main input, this doesn't work. And as you've discovered, triggered
>> side inputs are defined to be non-deterministic, and there's no way to make
>> things line up.
>>
>> What you're describing only makes sense if you're blocking against the
>> final pane (since otherwise there's no reasonable way to match up somePC
>> panes with the sink panes). There are multiple ways you can do this: one
>> would be to CGBK the two PCollections together, and trigger the new
>> transform only on the final pane. Another would be to add a combiner that
>> returns a Void, triggering only on the final pane, and then make this
>> singleton Void a side input. You could also do something explicit with the
>> state API.
>>
>> Reuven
>>
>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> So this appears not as easy as anticipated (surprise!)
>>>
>>> Suppose we have a PCollection "donePanes" with an element per
>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>>> data has been written; this pane is: final / non-final".
>>>
>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>>> happens only after the final pane has been written.
>>>
>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
>>> when c emits a *final* pane.
>>>
>>> Unfortunately, using
>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
>>> the trick: the side input becomes ready the moment *the first *pane of
>>> data has been written.
>>>
>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>>> only final panes...).apply(View.asSingleton())). It also becomes ready the
>>> moment *the first* pane has been written, you just get an exception if
>>> you access the side input before the *final* pane was written.
>>>
>>> I can't think of a pure-Beam solution to this: either "donePanes" will
>>> be used as a main input to something (and then everything else can only be
>>> a side input, which is not general enough), or it will be used as a side
>>> input (and then we can't achieve "trigger only after the final pane fires").
>>>
>>> It seems that we need a way to control the side input pushback, and
>>> configure whether a view becomes ready when its first pane has fired or
>>> when its last pane has fired. I could see this be a property on the View
>>> transform itself. In terms of implementation - I tried to figure out how
>>> side input readiness is determined, in the direct runner and Dataflow
>>> runner, and I'm completely lost and would appreciate some help.
>>>
>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> This sounds great!
>>>>
>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org>
>>>> wrote:
>>>>
>>>>> This would be absolutely great! It seems somewhat similar to the
>>>>> changes that were made to the BigQuery sink to support WriteResult (
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
>>>>> ).
>>>>>
>>>>> I find it helpful to think about the different things that may come
>>>>> after a sink. For instance:
>>>>>
>>>>> 1. It might be helpful to have a collection of failing input elements.
>>>>> The type of failed elements is pretty straightforward -- just the input
>>>>> elements. This allows handling such failures by directing them elsewhere 
>>>>> or
>>>>> performing additional processing.
>>>>>
>>>>
>>>> BigQueryIO already does this as you point out.
>>>>
>>>>>
>>>>> 2. For a sink that produces a series of files, it might be useful to
>>>>> have a collection of the file names that have been completely written. 
>>>>> This
>>>>> allows performing additional handling on these completed segments.
>>>>>
>>>>
>>>> In fact we already do this for FileBasedSinks.   See
>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java
>>>>
>>>>>
>>>>> 3. For a sink that updates some destination, it would be reasonable to
>>>>> have a collection that provides (periodically) output indicating how
>>>>> complete the information written to that destination is. For instance, 
>>>>> this
>>>>> might be something like "<this bigquery table> has all of the elements up
>>>>> to <input watermark>" complete. This allows tracking how much information
>>>>> has been completely written out.
>>>>>
>>>>
>>>> Interesting. Maybe tough to do since sinks often don't have that
>>>> knowledge.
>>>>
>>>>
>>>>>
>>>>> I think those concepts map to the more detailed description Eugene
>>>>> provided, but I find it helpful to focus on what information comes out of
>>>>> the sink and how it might be used.
>>>>>
>>>>> Were there any use cases the above miss? Any functionality that has
>>>>> been described that doesn't map to these use cases?
>>>>>
>>>>> -- Ben
>>>>>
>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>> It makes sense to consider how this maps onto existing kinds of sinks.
>>>>>>
>>>>>> E.g.:
>>>>>> - Something that just makes an RPC per record, e.g. MqttIO.write():
>>>>>> that will emit 1 result per bundle (either a bogus value or number of
>>>>>> records written) that will be Combine'd into 1 result per pane of input. 
>>>>>> A
>>>>>> user can sequence against this and be notified when some intermediate
>>>>>> amount of data has been written for a window, or (via .isFinal()) when 
>>>>>> all
>>>>>> of it has been written.
>>>>>> - Something that e.g. initiates an import job, such as
>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic
>>>>>> index swap: should emit 1 result per import job, e.g. containing
>>>>>> information about the job (e.g. its id and statistics). Role of panes is
>>>>>> the same.
>>>>>> - Something like above but that supports dynamic destinations: like
>>>>>> in WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> 
>>>>>> where
>>>>>> ResultT may be something like a list of files that were written for this
>>>>>> pane of this destination.
>>>>>>
>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I agree that the proper API for enabling the use case "do something
>>>>>>> after the data has been written" is to return a PCollection of objects
>>>>>>> where each object represents the result of writing some identifiable 
>>>>>>> subset
>>>>>>> of the data. Then one can apply a ParDo to this PCollection, in order to
>>>>>>> "do something after this subset has been written".
>>>>>>>
>>>>>>> The challenging part here is *identifying* the subset of the data
>>>>>>> that's been written, in a way consistent with Beam's unified
>>>>>>> batch/streaming model, where saying "all data has been written" is not 
>>>>>>> an
>>>>>>> option because more data can arrive.
>>>>>>>
>>>>>>> The next choice is "a window of input has been written", but then
>>>>>>> again, late data can arrive into a window as well.
>>>>>>>
>>>>>>> Next choice after that is "a pane of input has been written", but
>>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of
>>>>>>> input" is moot: triggering and panes should be something private to the
>>>>>>> sink, and the same input can trigger different sinks differently. The
>>>>>>> hypothetical different accumulation modes make this trickier still. I'm 
>>>>>>> not
>>>>>>> sure whether we intend to also challenge the idea that windowing is
>>>>>>> inherent to the collection, or whether it too should be specified on a
>>>>>>> transform that processes the collection. I think for the sake of this
>>>>>>> discussion we can assume that it's inherent, and assume the mental model
>>>>>>> that the elements in different windows of a PCollection are processed
>>>>>>> independently - "as if" there were multiple pipelines processing each
>>>>>>> window.
>>>>>>>
>>>>>>> Overall, embracing the full picture, we end up with something like
>>>>>>> this:
>>>>>>> - The input PCollection is a composition of windows.
>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>>>>>>> windows), the below applies to the entire contents of the PCollection. 
>>>>>>> If
>>>>>>> it's merging (e.g. session windows), then it applies per-key, and the 
>>>>>>> input
>>>>>>> should be (perhaps implicitly) keyed in a way that the sink understands 
>>>>>>> -
>>>>>>> for example, the grouping by destination in DynamicDestinations in file 
>>>>>>> and
>>>>>>> bigquery writes.
>>>>>>> - Each window's contents is a "changelog" - stream of elements and
>>>>>>> retractions.
>>>>>>> - A "sink" processes each window of the collection, deciding how to
>>>>>>> handle elements and retractions (and whether to support retractions at 
>>>>>>> all)
>>>>>>> in a sink-specific way, and deciding *when* to perform the side effects 
>>>>>>> for
>>>>>>> a portion of the changelog (a "pane") based on the sink's triggering
>>>>>>> strategy.
>>>>>>> - If the side effect itself is parallelized, then there'll be
>>>>>>> multiple results for the pane - e.g. one per bundle.
>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list of
>>>>>>> filenames that have been written, or simply a number of records that was
>>>>>>> written, or a bogus void value etc. The result will implicitly include 
>>>>>>> the
>>>>>>> window of the input it's associated with. It will also implicitly 
>>>>>>> include
>>>>>>> pane information - index of the pane in this window, and whether this is
>>>>>>> the first or last pane.
>>>>>>> - The partitioning into bundles is an implementation detail and not
>>>>>>> very useful, so before presenting the pane write results to the user, 
>>>>>>> the
>>>>>>> sink will probably want to Combine the bundle results so that there 
>>>>>>> ends up
>>>>>>> being 1 value for each pane that was written. Once again note that panes
>>>>>>> may be associated with windows of the input as a whole, but if the 
>>>>>>> input is
>>>>>>> keyed (like with DynamicDestinations) they'll be associated with per-key
>>>>>>> subsets of windows of the input.
>>>>>>> - This combining requires an extra, well, combining operation, so it
>>>>>>> should be optional.
>>>>>>> - The user will end up getting either a PCollection<ResultT> or a
>>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, 
>>>>>>> where
>>>>>>> the elements of this collection will implicitly have window and pane
>>>>>>> information, available via the implicit BoundedWindow and PaneInfo.
>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace the
>>>>>>> fact that trigger strategy is set on the input. But in that case the 
>>>>>>> user
>>>>>>> will have to accept that the PaneInfo of ResultT's is not necessarily
>>>>>>> directly related to panes of the input - the sink is allowed to do 
>>>>>>> internal
>>>>>>> aggregation as an implementation detail, which may modify the triggering
>>>>>>> strategy. Basically the user will still get sink-assigned panes.
>>>>>>> - In most cases, one may imagine that the user is interested in
>>>>>>> being notified of "no more data associated with this window will be
>>>>>>> written", so the user will ignore all ResultT's except those where the 
>>>>>>> pane
>>>>>>> is marked final. If a user is interested in being notified of 
>>>>>>> intermediate
>>>>>>> write results - they'll have to embrace the fact that they cannot 
>>>>>>> identify
>>>>>>> the precise subset of input associated with the intermediate result.
>>>>>>>
>>>>>>> I think the really key points of the above are:
>>>>>>> - Sinks should support windowed input. Sinks should write different
>>>>>>> windows of input independently. If the sink can write multi-destination
>>>>>>> input, the destination should function as a grouping key, and in that 
>>>>>>> case
>>>>>>> merging windowing should be allowed.
>>>>>>> - Producing a PCollection of write results should be optional.
>>>>>>> - When asked to produce results, sinks produce a PCollection of
>>>>>>> results that may be keyed or unkeyed (per above), and are placed in the
>>>>>>> window of the input that was written, and have a PaneInfo assigned by 
>>>>>>> the
>>>>>>> sink, of which probably the only part useful to the user is whether it's
>>>>>>> .isFinal().
>>>>>>>
>>>>>>> Does this sound reasonable?
>>>>>>>
>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +1
>>>>>>>>
>>>>>>>> At the very least an empty PCollection<?> could be produced with no
>>>>>>>> promises about its contents but the ability to be followed (e.g. as
>>>>>>>> a
>>>>>>>> side input), which is forward compatible with whatever actual
>>>>>>>> metadata
>>>>>>>> one may decide to produce in the future.
>>>>>>>>
>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com>
>>>>>>>> wrote:
>>>>>>>> > +dev@
>>>>>>>> >
>>>>>>>> > I am in complete agreement with Luke. Data dependencies are easy
>>>>>>>> to
>>>>>>>> > understand and a good way for an IO to communicate and establish
>>>>>>>> causal
>>>>>>>> > dependencies. Converting an IO from PDone to real output may spur
>>>>>>>> further
>>>>>>>> > useful thoughts based on the design decisions about what sort of
>>>>>>>> output is
>>>>>>>> > most useful.
>>>>>>>> >
>>>>>>>> > Kenn
>>>>>>>> >
>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> I think all sinks actually do have valuable information to
>>>>>>>> output which
>>>>>>>> >> can be used after a write (file names, transaction/commit/row
>>>>>>>> ids, table
>>>>>>>> >> names, ...). In addition to this metadata, having a PCollection
>>>>>>>> of all
>>>>>>>> >> successful writes and all failed writes is useful for users so
>>>>>>>> they can
>>>>>>>> >> chain an action which depends on what was or wasn't successfully
>>>>>>>> written.
>>>>>>>> >> Users have requested adding retry/failure handling policies to
>>>>>>>> sinks so that
>>>>>>>> >> failed writes don't jam up the pipeline.
>>>>>>>> >>
>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>>>>>>>> chet.aldr...@postmates.com>
>>>>>>>> >> wrote:
>>>>>>>> >>>
>>>>>>>> >>> So I agree generally with the idea that returning a PCollection
>>>>>>>> makes all
>>>>>>>> >>> of this easier so that arbitrary additional functions can be
>>>>>>>> added, what
>>>>>>>> >>> exactly would write functions be returning in a PCollection
>>>>>>>> that would make
>>>>>>>> >>> sense? The whole idea is that we’ve written to an external
>>>>>>>> source and now
>>>>>>>> >>> the collection itself is no longer needed.
>>>>>>>> >>>
>>>>>>>> >>> Currently, that’s represented with a PDone, but currently that
>>>>>>>> doesn’t
>>>>>>>> >>> allow any work to occur after it. I see a couple possible ways
>>>>>>>> of handling
>>>>>>>> >>> this given this conversation, and am curious which solution
>>>>>>>> sounds like the
>>>>>>>> >>> best way to deal with the problem:
>>>>>>>> >>>
>>>>>>>> >>> 1. Have output transforms always return something specific
>>>>>>>> (which would
>>>>>>>> >>> be the same across transforms by convention), that is in the
>>>>>>>> form of a
>>>>>>>> >>> PCollection, so operations can occur after it.
>>>>>>>> >>>
>>>>>>>> >>> 2. Make either PDone or some new type that can act as a
>>>>>>>> PCollection so we
>>>>>>>> >>> can run applies afterward.
>>>>>>>> >>>
>>>>>>>> >>> 3. Make output transforms provide the facility for a callback
>>>>>>>> function
>>>>>>>> >>> which runs after the transform is complete.
>>>>>>>> >>>
>>>>>>>> >>> I went through these gymnastics recently when I was trying to
>>>>>>>> build
>>>>>>>> >>> something that would move indices after writing to Algolia, and
>>>>>>>> the solution
>>>>>>>> >>> was to co-opt code from the old Sink class that used to exist
>>>>>>>> in Beam. The
>>>>>>>> >>> problem is that particular method requires the output transform
>>>>>>>> in question
>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t make
>>>>>>>> sense to
>>>>>>>> >>> return one. This seems like a bad solution, but unfortunately
>>>>>>>> there isn’t a
>>>>>>>> >>> notion of a transform that has no explicit output that needs to
>>>>>>>> have
>>>>>>>> >>> operations occur after it.
>>>>>>>> >>>
>>>>>>>> >>> The three potential solutions above address this issue, but I
>>>>>>>> would like
>>>>>>>> >>> to hear on which would be preferable (or perhaps a different
>>>>>>>> proposal
>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on this,
>>>>>>>> since it
>>>>>>>> >>> seems like a worthwhile feature addition. I would find it
>>>>>>>> really useful, for
>>>>>>>> >>> one.
>>>>>>>> >>>
>>>>>>>> >>> Chet
>>>>>>>> >>>
>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection is
>>>>>>>> returned
>>>>>>>> >>> containing the result of the sink so that any arbitrary
>>>>>>>> additional functions
>>>>>>>> >>> can be applied.
>>>>>>>> >>>
>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <
>>>>>>>> j...@nanthrax.net>
>>>>>>>> >>> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more than
>>>>>>>> in the
>>>>>>>> >>>> main.
>>>>>>>> >>>>
>>>>>>>> >>>> Regards
>>>>>>>> >>>> JB
>>>>>>>> >>>>
>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> I do something almost exactly like this, but with BigtableIO
>>>>>>>> instead.
>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I need
>>>>>>>> to finish this
>>>>>>>> >>>>> up...).  It would really be nice for most IOs to support
>>>>>>>> something like
>>>>>>>> >>>>> this.
>>>>>>>> >>>>>
>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the
>>>>>>>> output from
>>>>>>>> >>>>> the BigtableIO, and then feed that into your function which
>>>>>>>> will run when
>>>>>>>> >>>>> all writes finish.
>>>>>>>> >>>>>
>>>>>>>> >>>>> You probably want to avoid doing something in the main method
>>>>>>>> because
>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver
>>>>>>>> will die, get
>>>>>>>> >>>>> killed, machine will explode, etc).
>>>>>>>> >>>>>
>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997
>>>>>>>> >>>>>
>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <
>>>>>>>> nerdyn...@gmail.com
>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>>     Assuming you're in Java. You could just follow on in your
>>>>>>>> Main
>>>>>>>> >>>>> method.
>>>>>>>> >>>>>     Checking the state of the Result.
>>>>>>>> >>>>>
>>>>>>>> >>>>>     Example:
>>>>>>>> >>>>>     PipelineResult result = pipeline.run();
>>>>>>>> >>>>>     try {
>>>>>>>> >>>>>     result.waitUntilFinish();
>>>>>>>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>>>>>>>> >>>>>     //DO ES work
>>>>>>>> >>>>>     }
>>>>>>>> >>>>>     } catch(Exception e) {
>>>>>>>> >>>>>     result.cancel();
>>>>>>>> >>>>>     throw e;
>>>>>>>> >>>>>     }
>>>>>>>> >>>>>
>>>>>>>> >>>>>     Otherwise you could also use Oozie to construct a work
>>>>>>>> flow.
>>>>>>>> >>>>>
>>>>>>>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>>>>>>>> >>>>> <j...@nanthrax.net
>>>>>>>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>>         Hi,
>>>>>>>> >>>>>
>>>>>>>> >>>>>         yes, we had a similar question some days ago.
>>>>>>>> >>>>>
>>>>>>>> >>>>>         We can imagine to have a user callback fn fired when
>>>>>>>> the sink
>>>>>>>> >>>>> batch is
>>>>>>>> >>>>>         complete.
>>>>>>>> >>>>>
>>>>>>>> >>>>>         Let me think about that.
>>>>>>>> >>>>>
>>>>>>>> >>>>>         Regards
>>>>>>>> >>>>>         JB
>>>>>>>> >>>>>
>>>>>>>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>>             Hey JB,
>>>>>>>> >>>>>
>>>>>>>> >>>>>             Thanks for getting back so quickly.
>>>>>>>> >>>>>             I suppose in that case I would need a way of
>>>>>>>> monitoring
>>>>>>>> >>>>> when the ES
>>>>>>>> >>>>>             transform completes successfully before I can
>>>>>>>> proceed with
>>>>>>>> >>>>> doing the
>>>>>>>> >>>>>             swap.
>>>>>>>> >>>>>             The problem with this is that I can't think of a
>>>>>>>> good way
>>>>>>>> >>>>> to
>>>>>>>> >>>>>             determine that termination state short of polling
>>>>>>>> the new
>>>>>>>> >>>>> index to
>>>>>>>> >>>>>             check the document count compared to the size of
>>>>>>>> input
>>>>>>>> >>>>> PCollection.
>>>>>>>> >>>>>             That, or maybe I'd need to use an external system
>>>>>>>> like you
>>>>>>>> >>>>> mentioned
>>>>>>>> >>>>>             to poll on the state of the pipeline (I'm using
>>>>>>>> Google
>>>>>>>> >>>>> Dataflow, so
>>>>>>>> >>>>>             maybe there's a way to do this with some API).
>>>>>>>> >>>>>             But I would have thought that there would be an
>>>>>>>> easy way of
>>>>>>>> >>>>> simply
>>>>>>>> >>>>>             saying "do not process this transform until this
>>>>>>>> other
>>>>>>>> >>>>> transform
>>>>>>>> >>>>>             completes".
>>>>>>>> >>>>>             Is there no established way of "signaling" between
>>>>>>>> >>>>> pipelines when
>>>>>>>> >>>>>             some pipeline completes, or have some way of
>>>>>>>> declaring a
>>>>>>>> >>>>> dependency
>>>>>>>> >>>>>             of 1 transform on another transform?
>>>>>>>> >>>>>
>>>>>>>> >>>>>             Thanks again,
>>>>>>>> >>>>>             Philip
>>>>>>>> >>>>>
>>>>>>>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste
>>>>>>>> Onofré
>>>>>>>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>>>>>>>> >>>>> <mailto:j...@nanthrax.net
>>>>>>>> >>>>>
>>>>>>>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>>                  Hi Philip,
>>>>>>>> >>>>>
>>>>>>>> >>>>>                  You won't be able to do (3) in the same
>>>>>>>> pipeline as
>>>>>>>> >>>>> the
>>>>>>>> >>>>>             Elasticsearch Sink
>>>>>>>> >>>>>                  PTransform ends the pipeline with PDone.
>>>>>>>> >>>>>
>>>>>>>> >>>>>                  So, (3) has to be done in another pipeline
>>>>>>>> (using a
>>>>>>>> >>>>> DoFn) or in
>>>>>>>> >>>>>             another
>>>>>>>> >>>>>                  "system" (like Camel for instance). I would
>>>>>>>> do a check
>>>>>>>> >>>>> of the
>>>>>>>> >>>>>             data in the
>>>>>>>> >>>>>                  index and then trigger the swap there.
>>>>>>>> >>>>>
>>>>>>>> >>>>>                  Regards
>>>>>>>> >>>>>                  JB
>>>>>>>> >>>>>
>>>>>>>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>>                      Hi,
>>>>>>>> >>>>>
>>>>>>>> >>>>>                      I'm pretty new to Beam, and I've been
>>>>>>>> trying to
>>>>>>>> >>>>> use the
>>>>>>>> >>>>>             ElasticSearchIO
>>>>>>>> >>>>>                      sink to write docs into ES.
>>>>>>>> >>>>>                      With this, I want to be able to
>>>>>>>> >>>>>                      1. ingest and transform rows from DB
>>>>>>>> (done)
>>>>>>>> >>>>>                      2. write JSON docs/strings into a new ES
>>>>>>>> index
>>>>>>>> >>>>> (done)
>>>>>>>> >>>>>                      3. After (2) is complete and all
>>>>>>>> documents are
>>>>>>>> >>>>> written into
>>>>>>>> >>>>>             a new index,
>>>>>>>> >>>>>                      trigger an atomic index swap under an
>>>>>>>> alias to
>>>>>>>> >>>>> replace the
>>>>>>>> >>>>>             current
>>>>>>>> >>>>>                      aliased index with the new index
>>>>>>>> generated in step
>>>>>>>> >>>>> 2. This
>>>>>>>> >>>>>             is basically
>>>>>>>> >>>>>                      a single POST request to the ES cluster.
>>>>>>>> >>>>>
>>>>>>>> >>>>>                      The problem I'm facing is that I don't
>>>>>>>> seem to be
>>>>>>>> >>>>> able to
>>>>>>>> >>>>>             find a way to
>>>>>>>> >>>>>                      have a way for (3) to happen after step
>>>>>>>> (2) is
>>>>>>>> >>>>> complete.
>>>>>>>> >>>>>
>>>>>>>> >>>>>                      The ElasticSearchIO.Write transform
>>>>>>>> returns a
>>>>>>>> >>>>> PDone, and
>>>>>>>> >>>>>             I'm not sure
>>>>>>>> >>>>>                      how to proceed from there because it
>>>>>>>> doesn't seem
>>>>>>>> >>>>> to let me
>>>>>>>> >>>>>             do another
>>>>>>>> >>>>>                      apply on it to "define" a dependency.
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >>>>>
>>>>>>>> >>>>> <
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >
>>>>>>>> >>>>>
>>>>>>>> >>>>> <
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >>>>>
>>>>>>>> >>>>> <
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >>
>>>>>>>> >>>>>
>>>>>>>> >>>>> <
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >>>>>
>>>>>>>> >>>>> <
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >
>>>>>>>> >>>>>
>>>>>>>> >>>>> <
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >>>>>
>>>>>>>> >>>>> <
>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>>>>>>> >>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>                      Is there a recommended way to construct
>>>>>>>> pipelines
>>>>>>>> >>>>> workflows
>>>>>>>> >>>>>             like this?
>>>>>>>> >>>>>
>>>>>>>> >>>>>                      Thanks in advance,
>>>>>>>> >>>>>                      Philip
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>                  --     Jean-Baptiste Onofré
>>>>>>>> >>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>>> >>>>>             <mailto:jbono...@apache.org <mailto:
>>>>>>>> jbono...@apache.org>>
>>>>>>>> >>>>>             http://blog.nanthrax.net
>>>>>>>> >>>>>                  Talend - http://www.talend.com
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>         --         Jean-Baptiste Onofré
>>>>>>>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>>>>>>>> >>>>>         http://blog.nanthrax.net
>>>>>>>> >>>>>         Talend - http://www.talend.com
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>     --     Nick Verbeck - NerdyNick
>>>>>>>> >>>>>     ----------------------------------------------------
>>>>>>>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>>>>>>>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>>>>>>>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>
>>>>>>>> >>>> --
>>>>>>>> >>>> Jean-Baptiste Onofré
>>>>>>>> >>>> jbono...@apache.org
>>>>>>>> >>>> http://blog.nanthrax.net
>>>>>>>> >>>> Talend - http://www.talend.com
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>

Reply via email to