PR is out https://github.com/apache/beam/pull/4301
This should allow us to have useful sequencing for sinks like BigtableIO / BigQueryIO. Adding a couple of interested parties: - Steve, would you be interested in using this in https://github.com/apache/beam/pull/3997 ? - Mairbek: this should help in https://github.com/apache/beam/pull/4264 - in particular, this works properly in case the input can be firing multiple times. On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com> wrote: > I figured out the Never.ever() approach and it seems to work. Will finish > this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this > will be quite a useful transform. > > On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> I'm a bit confused by all of these suggestions: they sound plausible at a >> high level, but I'm having a hard time making any one of them concrete. >> >> So suppose we want to create a transform Wait.on(PCollection<?> signal): >> PCollection<T> -> PCollection<T>. >> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to >> "a", but buffers panes of "a" in any given window until the final pane of >> "sig" in the same window is fired (or, if it's never fired, until the >> window closes? could use a deadletter for that maybe). >> >> This transform I suppose would need to have a keyed and unkeyed version. >> >> The keyed version would support merging window fns, and would require "a" >> and "sig" to be keyed by the same key, and would work using a CoGbk - >> followed by a stateful ParDo? Or is there a way to get away without a >> stateful ParDo here? (not all runners support it) >> >> The unkeyed version would not support merging window fns. Reuven, can you >> elaborate how your combiner idea would work here - in particular, what do >> you mean by "triggering only on the final pane"? Do you mean filter >> non-final panes before entering the combiner? I wonder if that'll work, >> probably worth a shot. And Kenn, can you elaborate on "re-trigger on the >> side input with a Never.ever() trigger"? >> >> Thanks. >> >> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote: >> >>> This is an interesting point. >>> >>> In the past, we've often just though about sequencing some action to >>> take place after the sink, in which case you can simply use the sink output >>> as a main input. However if you want to run a transform with another >>> PCollection as a main input, this doesn't work. And as you've discovered, >>> triggered side inputs are defined to be non-deterministic, and there's no >>> way to make things line up. >>> >>> What you're describing only makes sense if you're blocking against the >>> final pane (since otherwise there's no reasonable way to match up somePC >>> panes with the sink panes). There are multiple ways you can do this: one >>> would be to CGBK the two PCollections together, and trigger the new >>> transform only on the final pane. Another would be to add a combiner that >>> returns a Void, triggering only on the final pane, and then make this >>> singleton Void a side input. You could also do something explicit with the >>> state API. >>> >>> Reuven >>> >>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> So this appears not as easy as anticipated (surprise!) >>>> >>>> Suppose we have a PCollection "donePanes" with an element per >>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of >>>> data has been written; this pane is: final / non-final". >>>> >>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn)) >>>> happens only after the final pane has been written. >>>> >>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen >>>> when c emits a *final* pane. >>>> >>>> Unfortunately, using >>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do >>>> the trick: the side input becomes ready the moment *the first *pane of >>>> data has been written. >>>> >>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter >>>> only final panes...).apply(View.asSingleton())). It also becomes ready the >>>> moment *the first* pane has been written, you just get an exception if >>>> you access the side input before the *final* pane was written. >>>> >>>> I can't think of a pure-Beam solution to this: either "donePanes" will >>>> be used as a main input to something (and then everything else can only be >>>> a side input, which is not general enough), or it will be used as a side >>>> input (and then we can't achieve "trigger only after the final pane >>>> fires"). >>>> >>>> It seems that we need a way to control the side input pushback, and >>>> configure whether a view becomes ready when its first pane has fired or >>>> when its last pane has fired. I could see this be a property on the View >>>> transform itself. In terms of implementation - I tried to figure out how >>>> side input readiness is determined, in the direct runner and Dataflow >>>> runner, and I'm completely lost and would appreciate some help. >>>> >>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> This sounds great! >>>>> >>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org> >>>>> wrote: >>>>> >>>>>> This would be absolutely great! It seems somewhat similar to the >>>>>> changes that were made to the BigQuery sink to support WriteResult ( >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java >>>>>> ). >>>>>> >>>>>> I find it helpful to think about the different things that may come >>>>>> after a sink. For instance: >>>>>> >>>>>> 1. It might be helpful to have a collection of failing input >>>>>> elements. The type of failed elements is pretty straightforward -- just >>>>>> the >>>>>> input elements. This allows handling such failures by directing them >>>>>> elsewhere or performing additional processing. >>>>>> >>>>> >>>>> BigQueryIO already does this as you point out. >>>>> >>>>>> >>>>>> 2. For a sink that produces a series of files, it might be useful to >>>>>> have a collection of the file names that have been completely written. >>>>>> This >>>>>> allows performing additional handling on these completed segments. >>>>>> >>>>> >>>>> In fact we already do this for FileBasedSinks. See >>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java >>>>> >>>>>> >>>>>> 3. For a sink that updates some destination, it would be reasonable >>>>>> to have a collection that provides (periodically) output indicating how >>>>>> complete the information written to that destination is. For instance, >>>>>> this >>>>>> might be something like "<this bigquery table> has all of the elements up >>>>>> to <input watermark>" complete. This allows tracking how much information >>>>>> has been completely written out. >>>>>> >>>>> >>>>> Interesting. Maybe tough to do since sinks often don't have that >>>>> knowledge. >>>>> >>>>> >>>>>> >>>>>> I think those concepts map to the more detailed description Eugene >>>>>> provided, but I find it helpful to focus on what information comes out of >>>>>> the sink and how it might be used. >>>>>> >>>>>> Were there any use cases the above miss? Any functionality that has >>>>>> been described that doesn't map to these use cases? >>>>>> >>>>>> -- Ben >>>>>> >>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov <kirpic...@google.com> >>>>>> wrote: >>>>>> >>>>>>> It makes sense to consider how this maps onto existing kinds of >>>>>>> sinks. >>>>>>> >>>>>>> E.g.: >>>>>>> - Something that just makes an RPC per record, e.g. MqttIO.write(): >>>>>>> that will emit 1 result per bundle (either a bogus value or number of >>>>>>> records written) that will be Combine'd into 1 result per pane of >>>>>>> input. A >>>>>>> user can sequence against this and be notified when some intermediate >>>>>>> amount of data has been written for a window, or (via .isFinal()) when >>>>>>> all >>>>>>> of it has been written. >>>>>>> - Something that e.g. initiates an import job, such as >>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic >>>>>>> index swap: should emit 1 result per import job, e.g. containing >>>>>>> information about the job (e.g. its id and statistics). Role of panes is >>>>>>> the same. >>>>>>> - Something like above but that supports dynamic destinations: like >>>>>>> in WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> >>>>>>> where >>>>>>> ResultT may be something like a list of files that were written for this >>>>>>> pane of this destination. >>>>>>> >>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov < >>>>>>> kirpic...@google.com> wrote: >>>>>>> >>>>>>>> I agree that the proper API for enabling the use case "do something >>>>>>>> after the data has been written" is to return a PCollection of objects >>>>>>>> where each object represents the result of writing some identifiable >>>>>>>> subset >>>>>>>> of the data. Then one can apply a ParDo to this PCollection, in order >>>>>>>> to >>>>>>>> "do something after this subset has been written". >>>>>>>> >>>>>>>> The challenging part here is *identifying* the subset of the data >>>>>>>> that's been written, in a way consistent with Beam's unified >>>>>>>> batch/streaming model, where saying "all data has been written" is not >>>>>>>> an >>>>>>>> option because more data can arrive. >>>>>>>> >>>>>>>> The next choice is "a window of input has been written", but then >>>>>>>> again, late data can arrive into a window as well. >>>>>>>> >>>>>>>> Next choice after that is "a pane of input has been written", but >>>>>>>> per https://s.apache.org/beam-sink-triggers the term "pane of >>>>>>>> input" is moot: triggering and panes should be something private to the >>>>>>>> sink, and the same input can trigger different sinks differently. The >>>>>>>> hypothetical different accumulation modes make this trickier still. >>>>>>>> I'm not >>>>>>>> sure whether we intend to also challenge the idea that windowing is >>>>>>>> inherent to the collection, or whether it too should be specified on a >>>>>>>> transform that processes the collection. I think for the sake of this >>>>>>>> discussion we can assume that it's inherent, and assume the mental >>>>>>>> model >>>>>>>> that the elements in different windows of a PCollection are processed >>>>>>>> independently - "as if" there were multiple pipelines processing each >>>>>>>> window. >>>>>>>> >>>>>>>> Overall, embracing the full picture, we end up with something like >>>>>>>> this: >>>>>>>> - The input PCollection is a composition of windows. >>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or sliding >>>>>>>> windows), the below applies to the entire contents of the PCollection. >>>>>>>> If >>>>>>>> it's merging (e.g. session windows), then it applies per-key, and the >>>>>>>> input >>>>>>>> should be (perhaps implicitly) keyed in a way that the sink >>>>>>>> understands - >>>>>>>> for example, the grouping by destination in DynamicDestinations in >>>>>>>> file and >>>>>>>> bigquery writes. >>>>>>>> - Each window's contents is a "changelog" - stream of elements and >>>>>>>> retractions. >>>>>>>> - A "sink" processes each window of the collection, deciding how to >>>>>>>> handle elements and retractions (and whether to support retractions at >>>>>>>> all) >>>>>>>> in a sink-specific way, and deciding *when* to perform the side >>>>>>>> effects for >>>>>>>> a portion of the changelog (a "pane") based on the sink's triggering >>>>>>>> strategy. >>>>>>>> - If the side effect itself is parallelized, then there'll be >>>>>>>> multiple results for the pane - e.g. one per bundle. >>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a list of >>>>>>>> filenames that have been written, or simply a number of records that >>>>>>>> was >>>>>>>> written, or a bogus void value etc. The result will implicitly include >>>>>>>> the >>>>>>>> window of the input it's associated with. It will also implicitly >>>>>>>> include >>>>>>>> pane information - index of the pane in this window, and whether this >>>>>>>> is >>>>>>>> the first or last pane. >>>>>>>> - The partitioning into bundles is an implementation detail and not >>>>>>>> very useful, so before presenting the pane write results to the user, >>>>>>>> the >>>>>>>> sink will probably want to Combine the bundle results so that there >>>>>>>> ends up >>>>>>>> being 1 value for each pane that was written. Once again note that >>>>>>>> panes >>>>>>>> may be associated with windows of the input as a whole, but if the >>>>>>>> input is >>>>>>>> keyed (like with DynamicDestinations) they'll be associated with >>>>>>>> per-key >>>>>>>> subsets of windows of the input. >>>>>>>> - This combining requires an extra, well, combining operation, so >>>>>>>> it should be optional. >>>>>>>> - The user will end up getting either a PCollection<ResultT> or a >>>>>>>> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, >>>>>>>> where >>>>>>>> the elements of this collection will implicitly have window and pane >>>>>>>> information, available via the implicit BoundedWindow and PaneInfo. >>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace the >>>>>>>> fact that trigger strategy is set on the input. But in that case the >>>>>>>> user >>>>>>>> will have to accept that the PaneInfo of ResultT's is not necessarily >>>>>>>> directly related to panes of the input - the sink is allowed to do >>>>>>>> internal >>>>>>>> aggregation as an implementation detail, which may modify the >>>>>>>> triggering >>>>>>>> strategy. Basically the user will still get sink-assigned panes. >>>>>>>> - In most cases, one may imagine that the user is interested in >>>>>>>> being notified of "no more data associated with this window will be >>>>>>>> written", so the user will ignore all ResultT's except those where the >>>>>>>> pane >>>>>>>> is marked final. If a user is interested in being notified of >>>>>>>> intermediate >>>>>>>> write results - they'll have to embrace the fact that they cannot >>>>>>>> identify >>>>>>>> the precise subset of input associated with the intermediate result. >>>>>>>> >>>>>>>> I think the really key points of the above are: >>>>>>>> - Sinks should support windowed input. Sinks should write different >>>>>>>> windows of input independently. If the sink can write multi-destination >>>>>>>> input, the destination should function as a grouping key, and in that >>>>>>>> case >>>>>>>> merging windowing should be allowed. >>>>>>>> - Producing a PCollection of write results should be optional. >>>>>>>> - When asked to produce results, sinks produce a PCollection of >>>>>>>> results that may be keyed or unkeyed (per above), and are placed in the >>>>>>>> window of the input that was written, and have a PaneInfo assigned by >>>>>>>> the >>>>>>>> sink, of which probably the only part useful to the user is whether >>>>>>>> it's >>>>>>>> .isFinal(). >>>>>>>> >>>>>>>> Does this sound reasonable? >>>>>>>> >>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> +1 >>>>>>>>> >>>>>>>>> At the very least an empty PCollection<?> could be produced with no >>>>>>>>> promises about its contents but the ability to be followed (e.g. >>>>>>>>> as a >>>>>>>>> side input), which is forward compatible with whatever actual >>>>>>>>> metadata >>>>>>>>> one may decide to produce in the future. >>>>>>>>> >>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> >>>>>>>>> wrote: >>>>>>>>> > +dev@ >>>>>>>>> > >>>>>>>>> > I am in complete agreement with Luke. Data dependencies are easy >>>>>>>>> to >>>>>>>>> > understand and a good way for an IO to communicate and establish >>>>>>>>> causal >>>>>>>>> > dependencies. Converting an IO from PDone to real output may >>>>>>>>> spur further >>>>>>>>> > useful thoughts based on the design decisions about what sort of >>>>>>>>> output is >>>>>>>>> > most useful. >>>>>>>>> > >>>>>>>>> > Kenn >>>>>>>>> > >>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> >>>>>>>>> wrote: >>>>>>>>> >> >>>>>>>>> >> I think all sinks actually do have valuable information to >>>>>>>>> output which >>>>>>>>> >> can be used after a write (file names, transaction/commit/row >>>>>>>>> ids, table >>>>>>>>> >> names, ...). In addition to this metadata, having a PCollection >>>>>>>>> of all >>>>>>>>> >> successful writes and all failed writes is useful for users so >>>>>>>>> they can >>>>>>>>> >> chain an action which depends on what was or wasn't >>>>>>>>> successfully written. >>>>>>>>> >> Users have requested adding retry/failure handling policies to >>>>>>>>> sinks so that >>>>>>>>> >> failed writes don't jam up the pipeline. >>>>>>>>> >> >>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>>>>>>> chet.aldr...@postmates.com> >>>>>>>>> >> wrote: >>>>>>>>> >>> >>>>>>>>> >>> So I agree generally with the idea that returning a >>>>>>>>> PCollection makes all >>>>>>>>> >>> of this easier so that arbitrary additional functions can be >>>>>>>>> added, what >>>>>>>>> >>> exactly would write functions be returning in a PCollection >>>>>>>>> that would make >>>>>>>>> >>> sense? The whole idea is that we’ve written to an external >>>>>>>>> source and now >>>>>>>>> >>> the collection itself is no longer needed. >>>>>>>>> >>> >>>>>>>>> >>> Currently, that’s represented with a PDone, but currently that >>>>>>>>> doesn’t >>>>>>>>> >>> allow any work to occur after it. I see a couple possible ways >>>>>>>>> of handling >>>>>>>>> >>> this given this conversation, and am curious which solution >>>>>>>>> sounds like the >>>>>>>>> >>> best way to deal with the problem: >>>>>>>>> >>> >>>>>>>>> >>> 1. Have output transforms always return something specific >>>>>>>>> (which would >>>>>>>>> >>> be the same across transforms by convention), that is in the >>>>>>>>> form of a >>>>>>>>> >>> PCollection, so operations can occur after it. >>>>>>>>> >>> >>>>>>>>> >>> 2. Make either PDone or some new type that can act as a >>>>>>>>> PCollection so we >>>>>>>>> >>> can run applies afterward. >>>>>>>>> >>> >>>>>>>>> >>> 3. Make output transforms provide the facility for a callback >>>>>>>>> function >>>>>>>>> >>> which runs after the transform is complete. >>>>>>>>> >>> >>>>>>>>> >>> I went through these gymnastics recently when I was trying to >>>>>>>>> build >>>>>>>>> >>> something that would move indices after writing to Algolia, >>>>>>>>> and the solution >>>>>>>>> >>> was to co-opt code from the old Sink class that used to exist >>>>>>>>> in Beam. The >>>>>>>>> >>> problem is that particular method requires the output >>>>>>>>> transform in question >>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t make >>>>>>>>> sense to >>>>>>>>> >>> return one. This seems like a bad solution, but unfortunately >>>>>>>>> there isn’t a >>>>>>>>> >>> notion of a transform that has no explicit output that needs >>>>>>>>> to have >>>>>>>>> >>> operations occur after it. >>>>>>>>> >>> >>>>>>>>> >>> The three potential solutions above address this issue, but I >>>>>>>>> would like >>>>>>>>> >>> to hear on which would be preferable (or perhaps a different >>>>>>>>> proposal >>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on this, >>>>>>>>> since it >>>>>>>>> >>> seems like a worthwhile feature addition. I would find it >>>>>>>>> really useful, for >>>>>>>>> >>> one. >>>>>>>>> >>> >>>>>>>>> >>> Chet >>>>>>>>> >>> >>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>> >>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection is >>>>>>>>> returned >>>>>>>>> >>> containing the result of the sink so that any arbitrary >>>>>>>>> additional functions >>>>>>>>> >>> can be applied. >>>>>>>>> >>> >>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>>>>>>> j...@nanthrax.net> >>>>>>>>> >>> wrote: >>>>>>>>> >>>> >>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more than >>>>>>>>> in the >>>>>>>>> >>>> main. >>>>>>>>> >>>> >>>>>>>>> >>>> Regards >>>>>>>>> >>>> JB >>>>>>>>> >>>> >>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> I do something almost exactly like this, but with BigtableIO >>>>>>>>> instead. >>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I need >>>>>>>>> to finish this >>>>>>>>> >>>>> up...). It would really be nice for most IOs to support >>>>>>>>> something like >>>>>>>>> >>>>> this. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the >>>>>>>>> output from >>>>>>>>> >>>>> the BigtableIO, and then feed that into your function which >>>>>>>>> will run when >>>>>>>>> >>>>> all writes finish. >>>>>>>>> >>>>> >>>>>>>>> >>>>> You probably want to avoid doing something in the main >>>>>>>>> method because >>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the driver >>>>>>>>> will die, get >>>>>>>>> >>>>> killed, machine will explode, etc). >>>>>>>>> >>>>> >>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>>>>>> >>>>> >>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick < >>>>>>>>> nerdyn...@gmail.com >>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> Assuming you're in Java. You could just follow on in >>>>>>>>> your Main >>>>>>>>> >>>>> method. >>>>>>>>> >>>>> Checking the state of the Result. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Example: >>>>>>>>> >>>>> PipelineResult result = pipeline.run(); >>>>>>>>> >>>>> try { >>>>>>>>> >>>>> result.waitUntilFinish(); >>>>>>>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>>>>>> >>>>> //DO ES work >>>>>>>>> >>>>> } >>>>>>>>> >>>>> } catch(Exception e) { >>>>>>>>> >>>>> result.cancel(); >>>>>>>>> >>>>> throw e; >>>>>>>>> >>>>> } >>>>>>>>> >>>>> >>>>>>>>> >>>>> Otherwise you could also use Oozie to construct a work >>>>>>>>> flow. >>>>>>>>> >>>>> >>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>>>>>> >>>>> <j...@nanthrax.net >>>>>>>>> >>>>> <mailto:j...@nanthrax.net>> wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> Hi, >>>>>>>>> >>>>> >>>>>>>>> >>>>> yes, we had a similar question some days ago. >>>>>>>>> >>>>> >>>>>>>>> >>>>> We can imagine to have a user callback fn fired when >>>>>>>>> the sink >>>>>>>>> >>>>> batch is >>>>>>>>> >>>>> complete. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Let me think about that. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Regards >>>>>>>>> >>>>> JB >>>>>>>>> >>>>> >>>>>>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> Hey JB, >>>>>>>>> >>>>> >>>>>>>>> >>>>> Thanks for getting back so quickly. >>>>>>>>> >>>>> I suppose in that case I would need a way of >>>>>>>>> monitoring >>>>>>>>> >>>>> when the ES >>>>>>>>> >>>>> transform completes successfully before I can >>>>>>>>> proceed with >>>>>>>>> >>>>> doing the >>>>>>>>> >>>>> swap. >>>>>>>>> >>>>> The problem with this is that I can't think of a >>>>>>>>> good way >>>>>>>>> >>>>> to >>>>>>>>> >>>>> determine that termination state short of >>>>>>>>> polling the new >>>>>>>>> >>>>> index to >>>>>>>>> >>>>> check the document count compared to the size of >>>>>>>>> input >>>>>>>>> >>>>> PCollection. >>>>>>>>> >>>>> That, or maybe I'd need to use an external >>>>>>>>> system like you >>>>>>>>> >>>>> mentioned >>>>>>>>> >>>>> to poll on the state of the pipeline (I'm using >>>>>>>>> Google >>>>>>>>> >>>>> Dataflow, so >>>>>>>>> >>>>> maybe there's a way to do this with some API). >>>>>>>>> >>>>> But I would have thought that there would be an >>>>>>>>> easy way of >>>>>>>>> >>>>> simply >>>>>>>>> >>>>> saying "do not process this transform until this >>>>>>>>> other >>>>>>>>> >>>>> transform >>>>>>>>> >>>>> completes". >>>>>>>>> >>>>> Is there no established way of "signaling" >>>>>>>>> between >>>>>>>>> >>>>> pipelines when >>>>>>>>> >>>>> some pipeline completes, or have some way of >>>>>>>>> declaring a >>>>>>>>> >>>>> dependency >>>>>>>>> >>>>> of 1 transform on another transform? >>>>>>>>> >>>>> >>>>>>>>> >>>>> Thanks again, >>>>>>>>> >>>>> Philip >>>>>>>>> >>>>> >>>>>>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste >>>>>>>>> Onofré >>>>>>>>> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>>>>>>> >>>>> <mailto:j...@nanthrax.net >>>>>>>>> >>>>> >>>>>>>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> Hi Philip, >>>>>>>>> >>>>> >>>>>>>>> >>>>> You won't be able to do (3) in the same >>>>>>>>> pipeline as >>>>>>>>> >>>>> the >>>>>>>>> >>>>> Elasticsearch Sink >>>>>>>>> >>>>> PTransform ends the pipeline with PDone. >>>>>>>>> >>>>> >>>>>>>>> >>>>> So, (3) has to be done in another pipeline >>>>>>>>> (using a >>>>>>>>> >>>>> DoFn) or in >>>>>>>>> >>>>> another >>>>>>>>> >>>>> "system" (like Camel for instance). I would >>>>>>>>> do a check >>>>>>>>> >>>>> of the >>>>>>>>> >>>>> data in the >>>>>>>>> >>>>> index and then trigger the swap there. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Regards >>>>>>>>> >>>>> JB >>>>>>>>> >>>>> >>>>>>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> Hi, >>>>>>>>> >>>>> >>>>>>>>> >>>>> I'm pretty new to Beam, and I've been >>>>>>>>> trying to >>>>>>>>> >>>>> use the >>>>>>>>> >>>>> ElasticSearchIO >>>>>>>>> >>>>> sink to write docs into ES. >>>>>>>>> >>>>> With this, I want to be able to >>>>>>>>> >>>>> 1. ingest and transform rows from DB >>>>>>>>> (done) >>>>>>>>> >>>>> 2. write JSON docs/strings into a new >>>>>>>>> ES index >>>>>>>>> >>>>> (done) >>>>>>>>> >>>>> 3. After (2) is complete and all >>>>>>>>> documents are >>>>>>>>> >>>>> written into >>>>>>>>> >>>>> a new index, >>>>>>>>> >>>>> trigger an atomic index swap under an >>>>>>>>> alias to >>>>>>>>> >>>>> replace the >>>>>>>>> >>>>> current >>>>>>>>> >>>>> aliased index with the new index >>>>>>>>> generated in step >>>>>>>>> >>>>> 2. This >>>>>>>>> >>>>> is basically >>>>>>>>> >>>>> a single POST request to the ES cluster. >>>>>>>>> >>>>> >>>>>>>>> >>>>> The problem I'm facing is that I don't >>>>>>>>> seem to be >>>>>>>>> >>>>> able to >>>>>>>>> >>>>> find a way to >>>>>>>>> >>>>> have a way for (3) to happen after step >>>>>>>>> (2) is >>>>>>>>> >>>>> complete. >>>>>>>>> >>>>> >>>>>>>>> >>>>> The ElasticSearchIO.Write transform >>>>>>>>> returns a >>>>>>>>> >>>>> PDone, and >>>>>>>>> >>>>> I'm not sure >>>>>>>>> >>>>> how to proceed from there because it >>>>>>>>> doesn't seem >>>>>>>>> >>>>> to let me >>>>>>>>> >>>>> do another >>>>>>>>> >>>>> apply on it to "define" a dependency. >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> >>>>> >>>>>>>>> >>>>> < >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> > >>>>>>>>> >>>>> >>>>>>>>> >>>>> < >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> >>>>> >>>>>>>>> >>>>> < >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> >> >>>>>>>>> >>>>> >>>>>>>>> >>>>> < >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> >>>>> >>>>>>>>> >>>>> < >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> > >>>>>>>>> >>>>> >>>>>>>>> >>>>> < >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> >>>>> >>>>>>>>> >>>>> < >>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>> >>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> Is there a recommended way to construct >>>>>>>>> pipelines >>>>>>>>> >>>>> workflows >>>>>>>>> >>>>> like this? >>>>>>>>> >>>>> >>>>>>>>> >>>>> Thanks in advance, >>>>>>>>> >>>>> Philip >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>>>>>> >>>>> <mailto:jbono...@apache.org <mailto: >>>>>>>>> jbono...@apache.org>> >>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>>>>>> >>>>> ---------------------------------------------------- >>>>>>>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>>>>>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>>>>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>> >>>>>>>>> >>>> -- >>>>>>>>> >>>> Jean-Baptiste Onofré >>>>>>>>> >>>> jbono...@apache.org >>>>>>>>> >>>> http://blog.nanthrax.net >>>>>>>>> >>>> Talend - http://www.talend.com >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >> >>>>>>>>> > >>>>>>>>> >>>>>>>> >>>