I figured out the Never.ever() approach and it seems to work. Will finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this will be quite a useful transform.
On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> wrote: > I'm a bit confused by all of these suggestions: they sound plausible at a > high level, but I'm having a hard time making any one of them concrete. > > So suppose we want to create a transform Wait.on(PCollection<?> signal): > PCollection<T> -> PCollection<T>. > a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to > "a", but buffers panes of "a" in any given window until the final pane of > "sig" in the same window is fired (or, if it's never fired, until the > window closes? could use a deadletter for that maybe). > > This transform I suppose would need to have a keyed and unkeyed version. > > The keyed version would support merging window fns, and would require "a" > and "sig" to be keyed by the same key, and would work using a CoGbk - > followed by a stateful ParDo? Or is there a way to get away without a > stateful ParDo here? (not all runners support it) > > The unkeyed version would not support merging window fns. Reuven, can you > elaborate how your combiner idea would work here - in particular, what do > you mean by "triggering only on the final pane"? Do you mean filter > non-final panes before entering the combiner? I wonder if that'll work, > probably worth a shot. And Kenn, can you elaborate on "re-trigger on the > side input with a Never.ever() trigger"? > > Thanks. > > On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote: > >> This is an interesting point. >> >> In the past, we've often just though about sequencing some action to take >> place after the sink, in which case you can simply use the sink output as a >> main input. However if you want to run a transform with another PCollection >> as a main input, this doesn't work. And as you've discovered, triggered >> side inputs are defined to be non-deterministic, and there's no way to make >> things line up. >> >> What you're describing only makes sense if you're blocking against the >> final pane (since otherwise there's no reasonable way to match up somePC >> panes with the sink panes). There are multiple ways you can do this: one >> would be to CGBK the two PCollections together, and trigger the new >> transform only on the final pane. Another would be to add a combiner that >> returns a Void, triggering only on the final pane, and then make this >> singleton Void a side input. You could also do something explicit with the >> state API. >> >> Reuven >> >> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> So this appears not as easy as anticipated (surprise!) >>> >>> Suppose we have a PCollection "donePanes" with an element per >>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of >>> data has been written; this pane is: final / non-final". >>> >>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn)) >>> happens only after the final pane has been written. >>> >>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen >>> when c emits a *final* pane. >>> >>> Unfortunately, using >>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do >>> the trick: the side input becomes ready the moment *the first *pane of >>> data has been written. >>> >>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter >>> only final panes...).apply(View.asSingleton())). It also becomes ready the >>> moment *the first* pane has been written, you just get an exception if >>> you access the side input before the *final* pane was written. >>> >>> I can't think of a pure-Beam solution to this: either "donePanes" will >>> be used as a main input to something (and then everything else can only be >>> a side input, which is not general enough), or it will be used as a side >>> input (and then we can't achieve "trigger only after the final pane fires"). >>> >>> It seems that we need a way to control the side input pushback, and >>> configure whether a view becomes ready when its first pane has fired or >>> when its last pane has fired. I could see this be a property on the View >>> transform itself. In terms of implementation - I tried to figure out how >>> side input readiness is determined, in the direct runner and Dataflow >>> runner, and I'm completely lost and would appreciate some help. >>> >>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote: >>> >>>> This sounds great! >>>> >>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org> >>>> wrote: >>>> >>>>> This would be absolutely great! It seems somewhat similar to the >>>>> changes that were made to the BigQuery sink to support WriteResult ( >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java >>>>> ). >>>>> >>>>> I find it helpful to think about the different things that may come >>>>> after a sink. For instance: >>>>> >>>>> 1. It might be helpful to have a collection of failing input elements. >>>>> The type of failed elements is pretty straightforward -- just the input >>>>> elements. This allows handling such failures by directing them elsewhere >>>>> or >>>>> performing additional processing. >>>>> >>>> >>>> BigQueryIO already does this as you point out. >>>> >>>>> >>>>> 2. For a sink that produces a series of files, it might be useful to >>>>> have a collection of the file names that have been completely written. >>>>> This >>>>> allows performing additional handling on these completed segments. >>>>> >>>> >>>> In fact we already do this for FileBasedSinks. See >>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java >>>> >>>>> >>>>> 3. For a sink that updates some destination, it would be reasonable to >>>>> have a collection that provides (periodically) output indicating how >>>>> complete the information written to that destination is. For instance, >>>>> this >>>>> might be something like "<this bigquery table> has all of the elements up >>>>> to <input watermark>" complete. This allows tracking how much information >>>>> has been completely written out. >>>>> >>>> >>>> Interesting. Maybe tough to do since sinks often don't have that >>>> knowledge. >>>> >>>> >>>>> >>>>> I think those concepts map to the more detailed description Eugene >>>>> provided, but I find it helpful to focus on what information comes out of >>>>> the sink and how it might be used. >>>>> >>>>> Were there any use cases the above miss? Any functionality that has >>>>> been described that doesn't map to these use cases? >>>>> >>>>> -- Ben >>>>> >>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com> >>>>> wrote: >>>>> >>>>>> It makes sense to consider how this maps onto existing kinds of sinks. >>>>>> >>>>>> E.g.: >>>>>> - Something that just makes an RPC per record, e.g. MqttIO.write(): >>>>>> that will emit 1 result per bundle (either a bogus value or number of >>>>>> records written) that will be Combine'd into 1 result per pane of input. >>>>>> A >>>>>> user can sequence against this and be notified when some intermediate >>>>>> amount of data has been written for a window, or (via .isFinal()) when >>>>>> all >>>>>> of it has been written. >>>>>> - Something that e.g. initiates an import job, such as >>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic >>>>>> index swap: should emit 1 result per import job, e.g. containing >>>>>> information about the job (e.g. its id and statistics). Role of panes is >>>>>> the same. >>>>>> - Something like above but that supports dynamic destinations: like >>>>>> in WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> >>>>>> where >>>>>> ResultT may be something like a list of files that were written for this >>>>>> pane of this destination. >>>>>> >>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com> >>>>>> wrote: >>>>>> >>>>>>> I agree that the proper API for enabling the use case "do something >>>>>>> after the data has been written" is to return a PCollection of objects >>>>>>> where each object represents the result of writing some identifiable >>>>>>> subset >>>>>>> of the data. Then one can apply a ParDo to this PCollection, in order to >>>>>>> "do something after this subset has been written". >>>>>>> >>>>>>> The challenging part here is *identifying* the subset of the data >>>>>>> that's been written, in a way consistent with Beam's unified >>>>>>> batch/streaming model, where saying "all data has been written" is not >>>>>>> an >>>>>>> option because more data can arrive. >>>>>>> >>>>>>> The next choice is "a window of input has been written", but then >>>>>>> again, late data can arrive into a window as well. >>>>>>> >>>>>>> Next choice after that is "a pane of input has been written", but >>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of >>>>>>> input" is moot: triggering and panes should be something private to the >>>>>>> sink, and the same input can trigger different sinks differently. The >>>>>>> hypothetical different accumulation modes make this trickier still. I'm >>>>>>> not >>>>>>> sure whether we intend to also challenge the idea that windowing is >>>>>>> inherent to the collection, or whether it too should be specified on a >>>>>>> transform that processes the collection. I think for the sake of this >>>>>>> discussion we can assume that it's inherent, and assume the mental model >>>>>>> that the elements in different windows of a PCollection are processed >>>>>>> independently - "as if" there were multiple pipelines processing each >>>>>>> window. >>>>>>> >>>>>>> Overall, embracing the full picture, we end up with something like >>>>>>> this: >>>>>>> - The input PCollection is a composition of windows. >>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding >>>>>>> windows), the below applies to the entire contents of the PCollection. >>>>>>> If >>>>>>> it's merging (e.g. session windows), then it applies per-key, and the >>>>>>> input >>>>>>> should be (perhaps implicitly) keyed in a way that the sink understands >>>>>>> - >>>>>>> for example, the grouping by destination in DynamicDestinations in file >>>>>>> and >>>>>>> bigquery writes. >>>>>>> - Each window's contents is a "changelog" - stream of elements and >>>>>>> retractions. >>>>>>> - A "sink" processes each window of the collection, deciding how to >>>>>>> handle elements and retractions (and whether to support retractions at >>>>>>> all) >>>>>>> in a sink-specific way, and deciding *when* to perform the side effects >>>>>>> for >>>>>>> a portion of the changelog (a "pane") based on the sink's triggering >>>>>>> strategy. >>>>>>> - If the side effect itself is parallelized, then there'll be >>>>>>> multiple results for the pane - e.g. one per bundle. >>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list of >>>>>>> filenames that have been written, or simply a number of records that was >>>>>>> written, or a bogus void value etc. The result will implicitly include >>>>>>> the >>>>>>> window of the input it's associated with. It will also implicitly >>>>>>> include >>>>>>> pane information - index of the pane in this window, and whether this is >>>>>>> the first or last pane. >>>>>>> - The partitioning into bundles is an implementation detail and not >>>>>>> very useful, so before presenting the pane write results to the user, >>>>>>> the >>>>>>> sink will probably want to Combine the bundle results so that there >>>>>>> ends up >>>>>>> being 1 value for each pane that was written. Once again note that panes >>>>>>> may be associated with windows of the input as a whole, but if the >>>>>>> input is >>>>>>> keyed (like with DynamicDestinations) they'll be associated with per-key >>>>>>> subsets of windows of the input. >>>>>>> - This combining requires an extra, well, combining operation, so it >>>>>>> should be optional. >>>>>>> - The user will end up getting either a PCollection<ResultT> or a >>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, >>>>>>> where >>>>>>> the elements of this collection will implicitly have window and pane >>>>>>> information, available via the implicit BoundedWindow and PaneInfo. >>>>>>> - Until "sink triggering" is implemented, we'll have to embrace the >>>>>>> fact that trigger strategy is set on the input. But in that case the >>>>>>> user >>>>>>> will have to accept that the PaneInfo of ResultT's is not necessarily >>>>>>> directly related to panes of the input - the sink is allowed to do >>>>>>> internal >>>>>>> aggregation as an implementation detail, which may modify the triggering >>>>>>> strategy. Basically the user will still get sink-assigned panes. >>>>>>> - In most cases, one may imagine that the user is interested in >>>>>>> being notified of "no more data associated with this window will be >>>>>>> written", so the user will ignore all ResultT's except those where the >>>>>>> pane >>>>>>> is marked final. If a user is interested in being notified of >>>>>>> intermediate >>>>>>> write results - they'll have to embrace the fact that they cannot >>>>>>> identify >>>>>>> the precise subset of input associated with the intermediate result. >>>>>>> >>>>>>> I think the really key points of the above are: >>>>>>> - Sinks should support windowed input. Sinks should write different >>>>>>> windows of input independently. If the sink can write multi-destination >>>>>>> input, the destination should function as a grouping key, and in that >>>>>>> case >>>>>>> merging windowing should be allowed. >>>>>>> - Producing a PCollection of write results should be optional. >>>>>>> - When asked to produce results, sinks produce a PCollection of >>>>>>> results that may be keyed or unkeyed (per above), and are placed in the >>>>>>> window of the input that was written, and have a PaneInfo assigned by >>>>>>> the >>>>>>> sink, of which probably the only part useful to the user is whether it's >>>>>>> .isFinal(). >>>>>>> >>>>>>> Does this sound reasonable? >>>>>>> >>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> +1 >>>>>>>> >>>>>>>> At the very least an empty PCollection<?> could be produced with no >>>>>>>> promises about its contents but the ability to be followed (e.g. as >>>>>>>> a >>>>>>>> side input), which is forward compatible with whatever actual >>>>>>>> metadata >>>>>>>> one may decide to produce in the future. >>>>>>>> >>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> >>>>>>>> wrote: >>>>>>>> > +dev@ >>>>>>>> > >>>>>>>> > I am in complete agreement with Luke. Data dependencies are easy >>>>>>>> to >>>>>>>> > understand and a good way for an IO to communicate and establish >>>>>>>> causal >>>>>>>> > dependencies. Converting an IO from PDone to real output may spur >>>>>>>> further >>>>>>>> > useful thoughts based on the design decisions about what sort of >>>>>>>> output is >>>>>>>> > most useful. >>>>>>>> > >>>>>>>> > Kenn >>>>>>>> > >>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >> >>>>>>>> >> I think all sinks actually do have valuable information to >>>>>>>> output which >>>>>>>> >> can be used after a write (file names, transaction/commit/row >>>>>>>> ids, table >>>>>>>> >> names, ...). In addition to this metadata, having a PCollection >>>>>>>> of all >>>>>>>> >> successful writes and all failed writes is useful for users so >>>>>>>> they can >>>>>>>> >> chain an action which depends on what was or wasn't successfully >>>>>>>> written. >>>>>>>> >> Users have requested adding retry/failure handling policies to >>>>>>>> sinks so that >>>>>>>> >> failed writes don't jam up the pipeline. >>>>>>>> >> >>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>>>>>> chet.aldr...@postmates.com> >>>>>>>> >> wrote: >>>>>>>> >>> >>>>>>>> >>> So I agree generally with the idea that returning a PCollection >>>>>>>> makes all >>>>>>>> >>> of this easier so that arbitrary additional functions can be >>>>>>>> added, what >>>>>>>> >>> exactly would write functions be returning in a PCollection >>>>>>>> that would make >>>>>>>> >>> sense? The whole idea is that we’ve written to an external >>>>>>>> source and now >>>>>>>> >>> the collection itself is no longer needed. >>>>>>>> >>> >>>>>>>> >>> Currently, that’s represented with a PDone, but currently that >>>>>>>> doesn’t >>>>>>>> >>> allow any work to occur after it. I see a couple possible ways >>>>>>>> of handling >>>>>>>> >>> this given this conversation, and am curious which solution >>>>>>>> sounds like the >>>>>>>> >>> best way to deal with the problem: >>>>>>>> >>> >>>>>>>> >>> 1. Have output transforms always return something specific >>>>>>>> (which would >>>>>>>> >>> be the same across transforms by convention), that is in the >>>>>>>> form of a >>>>>>>> >>> PCollection, so operations can occur after it. >>>>>>>> >>> >>>>>>>> >>> 2. Make either PDone or some new type that can act as a >>>>>>>> PCollection so we >>>>>>>> >>> can run applies afterward. >>>>>>>> >>> >>>>>>>> >>> 3. Make output transforms provide the facility for a callback >>>>>>>> function >>>>>>>> >>> which runs after the transform is complete. >>>>>>>> >>> >>>>>>>> >>> I went through these gymnastics recently when I was trying to >>>>>>>> build >>>>>>>> >>> something that would move indices after writing to Algolia, and >>>>>>>> the solution >>>>>>>> >>> was to co-opt code from the old Sink class that used to exist >>>>>>>> in Beam. The >>>>>>>> >>> problem is that particular method requires the output transform >>>>>>>> in question >>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t make >>>>>>>> sense to >>>>>>>> >>> return one. This seems like a bad solution, but unfortunately >>>>>>>> there isn’t a >>>>>>>> >>> notion of a transform that has no explicit output that needs to >>>>>>>> have >>>>>>>> >>> operations occur after it. >>>>>>>> >>> >>>>>>>> >>> The three potential solutions above address this issue, but I >>>>>>>> would like >>>>>>>> >>> to hear on which would be preferable (or perhaps a different >>>>>>>> proposal >>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on this, >>>>>>>> since it >>>>>>>> >>> seems like a worthwhile feature addition. I would find it >>>>>>>> really useful, for >>>>>>>> >>> one. >>>>>>>> >>> >>>>>>>> >>> Chet >>>>>>>> >>> >>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>> >>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection is >>>>>>>> returned >>>>>>>> >>> containing the result of the sink so that any arbitrary >>>>>>>> additional functions >>>>>>>> >>> can be applied. >>>>>>>> >>> >>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>>>>>> j...@nanthrax.net> >>>>>>>> >>> wrote: >>>>>>>> >>>> >>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more than >>>>>>>> in the >>>>>>>> >>>> main. >>>>>>>> >>>> >>>>>>>> >>>> Regards >>>>>>>> >>>> JB >>>>>>>> >>>> >>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> I do something almost exactly like this, but with BigtableIO >>>>>>>> instead. >>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I need >>>>>>>> to finish this >>>>>>>> >>>>> up...). It would really be nice for most IOs to support >>>>>>>> something like >>>>>>>> >>>>> this. >>>>>>>> >>>>> >>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the >>>>>>>> output from >>>>>>>> >>>>> the BigtableIO, and then feed that into your function which >>>>>>>> will run when >>>>>>>> >>>>> all writes finish. >>>>>>>> >>>>> >>>>>>>> >>>>> You probably want to avoid doing something in the main method >>>>>>>> because >>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver >>>>>>>> will die, get >>>>>>>> >>>>> killed, machine will explode, etc). >>>>>>>> >>>>> >>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>>>>> >>>>> >>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick < >>>>>>>> nerdyn...@gmail.com >>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Assuming you're in Java. You could just follow on in your >>>>>>>> Main >>>>>>>> >>>>> method. >>>>>>>> >>>>> Checking the state of the Result. >>>>>>>> >>>>> >>>>>>>> >>>>> Example: >>>>>>>> >>>>> PipelineResult result = pipeline.run(); >>>>>>>> >>>>> try { >>>>>>>> >>>>> result.waitUntilFinish(); >>>>>>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>>>>> >>>>> //DO ES work >>>>>>>> >>>>> } >>>>>>>> >>>>> } catch(Exception e) { >>>>>>>> >>>>> result.cancel(); >>>>>>>> >>>>> throw e; >>>>>>>> >>>>> } >>>>>>>> >>>>> >>>>>>>> >>>>> Otherwise you could also use Oozie to construct a work >>>>>>>> flow. >>>>>>>> >>>>> >>>>>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>>>>> >>>>> <j...@nanthrax.net >>>>>>>> >>>>> <mailto:j...@nanthrax.net>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Hi, >>>>>>>> >>>>> >>>>>>>> >>>>> yes, we had a similar question some days ago. >>>>>>>> >>>>> >>>>>>>> >>>>> We can imagine to have a user callback fn fired when >>>>>>>> the sink >>>>>>>> >>>>> batch is >>>>>>>> >>>>> complete. >>>>>>>> >>>>> >>>>>>>> >>>>> Let me think about that. >>>>>>>> >>>>> >>>>>>>> >>>>> Regards >>>>>>>> >>>>> JB >>>>>>>> >>>>> >>>>>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Hey JB, >>>>>>>> >>>>> >>>>>>>> >>>>> Thanks for getting back so quickly. >>>>>>>> >>>>> I suppose in that case I would need a way of >>>>>>>> monitoring >>>>>>>> >>>>> when the ES >>>>>>>> >>>>> transform completes successfully before I can >>>>>>>> proceed with >>>>>>>> >>>>> doing the >>>>>>>> >>>>> swap. >>>>>>>> >>>>> The problem with this is that I can't think of a >>>>>>>> good way >>>>>>>> >>>>> to >>>>>>>> >>>>> determine that termination state short of polling >>>>>>>> the new >>>>>>>> >>>>> index to >>>>>>>> >>>>> check the document count compared to the size of >>>>>>>> input >>>>>>>> >>>>> PCollection. >>>>>>>> >>>>> That, or maybe I'd need to use an external system >>>>>>>> like you >>>>>>>> >>>>> mentioned >>>>>>>> >>>>> to poll on the state of the pipeline (I'm using >>>>>>>> Google >>>>>>>> >>>>> Dataflow, so >>>>>>>> >>>>> maybe there's a way to do this with some API). >>>>>>>> >>>>> But I would have thought that there would be an >>>>>>>> easy way of >>>>>>>> >>>>> simply >>>>>>>> >>>>> saying "do not process this transform until this >>>>>>>> other >>>>>>>> >>>>> transform >>>>>>>> >>>>> completes". >>>>>>>> >>>>> Is there no established way of "signaling" between >>>>>>>> >>>>> pipelines when >>>>>>>> >>>>> some pipeline completes, or have some way of >>>>>>>> declaring a >>>>>>>> >>>>> dependency >>>>>>>> >>>>> of 1 transform on another transform? >>>>>>>> >>>>> >>>>>>>> >>>>> Thanks again, >>>>>>>> >>>>> Philip >>>>>>>> >>>>> >>>>>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste >>>>>>>> Onofré >>>>>>>> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>>>>>> >>>>> <mailto:j...@nanthrax.net >>>>>>>> >>>>> >>>>>>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Hi Philip, >>>>>>>> >>>>> >>>>>>>> >>>>> You won't be able to do (3) in the same >>>>>>>> pipeline as >>>>>>>> >>>>> the >>>>>>>> >>>>> Elasticsearch Sink >>>>>>>> >>>>> PTransform ends the pipeline with PDone. >>>>>>>> >>>>> >>>>>>>> >>>>> So, (3) has to be done in another pipeline >>>>>>>> (using a >>>>>>>> >>>>> DoFn) or in >>>>>>>> >>>>> another >>>>>>>> >>>>> "system" (like Camel for instance). I would >>>>>>>> do a check >>>>>>>> >>>>> of the >>>>>>>> >>>>> data in the >>>>>>>> >>>>> index and then trigger the swap there. >>>>>>>> >>>>> >>>>>>>> >>>>> Regards >>>>>>>> >>>>> JB >>>>>>>> >>>>> >>>>>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Hi, >>>>>>>> >>>>> >>>>>>>> >>>>> I'm pretty new to Beam, and I've been >>>>>>>> trying to >>>>>>>> >>>>> use the >>>>>>>> >>>>> ElasticSearchIO >>>>>>>> >>>>> sink to write docs into ES. >>>>>>>> >>>>> With this, I want to be able to >>>>>>>> >>>>> 1. ingest and transform rows from DB >>>>>>>> (done) >>>>>>>> >>>>> 2. write JSON docs/strings into a new ES >>>>>>>> index >>>>>>>> >>>>> (done) >>>>>>>> >>>>> 3. After (2) is complete and all >>>>>>>> documents are >>>>>>>> >>>>> written into >>>>>>>> >>>>> a new index, >>>>>>>> >>>>> trigger an atomic index swap under an >>>>>>>> alias to >>>>>>>> >>>>> replace the >>>>>>>> >>>>> current >>>>>>>> >>>>> aliased index with the new index >>>>>>>> generated in step >>>>>>>> >>>>> 2. This >>>>>>>> >>>>> is basically >>>>>>>> >>>>> a single POST request to the ES cluster. >>>>>>>> >>>>> >>>>>>>> >>>>> The problem I'm facing is that I don't >>>>>>>> seem to be >>>>>>>> >>>>> able to >>>>>>>> >>>>> find a way to >>>>>>>> >>>>> have a way for (3) to happen after step >>>>>>>> (2) is >>>>>>>> >>>>> complete. >>>>>>>> >>>>> >>>>>>>> >>>>> The ElasticSearchIO.Write transform >>>>>>>> returns a >>>>>>>> >>>>> PDone, and >>>>>>>> >>>>> I'm not sure >>>>>>>> >>>>> how to proceed from there because it >>>>>>>> doesn't seem >>>>>>>> >>>>> to let me >>>>>>>> >>>>> do another >>>>>>>> >>>>> apply on it to "define" a dependency. >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> >>>>> >>>>>>>> >>>>> < >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> > >>>>>>>> >>>>> >>>>>>>> >>>>> < >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> >>>>> >>>>>>>> >>>>> < >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> >> >>>>>>>> >>>>> >>>>>>>> >>>>> < >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> >>>>> >>>>>>>> >>>>> < >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> > >>>>>>>> >>>>> >>>>>>>> >>>>> < >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> >>>>> >>>>>>>> >>>>> < >>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>> >>> >>>>>>>> >>>>> >>>>>>>> >>>>> Is there a recommended way to construct >>>>>>>> pipelines >>>>>>>> >>>>> workflows >>>>>>>> >>>>> like this? >>>>>>>> >>>>> >>>>>>>> >>>>> Thanks in advance, >>>>>>>> >>>>> Philip >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>>>>> >>>>> <mailto:jbono...@apache.org <mailto: >>>>>>>> jbono...@apache.org>> >>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>>>>> >>>>> ---------------------------------------------------- >>>>>>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>>>>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>>>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>> >>>>>>>> >>>> -- >>>>>>>> >>>> Jean-Baptiste Onofré >>>>>>>> >>>> jbono...@apache.org >>>>>>>> >>>> http://blog.nanthrax.net >>>>>>>> >>>> Talend - http://www.talend.com >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >> >>>>>>>> > >>>>>>>> >>>>>>> >>