It is well-defined, but in the global window anything that has only 1 pane is not that useful in streaming pipelines :)
Another well-defined thing: "watermark of the signal + its allowed lateness pass the end of the main input's window". Hmm I think this can be implemented by simply rewindowing the signal into the main input's WindowFn? Perhaps we should even always do this? On Fri, Dec 22, 2017 at 8:11 AM Reuven Lax <re...@google.com> wrote: > I think it is also well-defined for the on-time (watermark) pane, as there > is always only one such. In general I think sequencing is well defined for > three panes: > > 1. The first pane fired (this is how side inputs are sequenced) > 2. The first on-time pane, as there is only one. > 3. The final pane. > > > > On Thu, Dec 21, 2017 at 5:49 PM, Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Yeah. And I don't think there's a good way to define what sequencing even >> means, if the sink is returning results in windows that aren't gonna have a >> final pane. >> >> On Thu, Dec 21, 2017, 2:00 AM Reuven Lax <re...@google.com> wrote: >> >>> This is only for "final pane" waiting, correct? So someone who writes a >>> sink in the global window probably would not want to use this. >>> >>> On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> PR is out https://github.com/apache/beam/pull/4301 >>>> >>>> This should allow us to have useful sequencing for sinks like >>>> BigtableIO / BigQueryIO. >>>> >>>> Adding a couple of interested parties: >>>> - Steve, would you be interested in using this in >>>> https://github.com/apache/beam/pull/3997 ? >>>> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 - >>>> in particular, this works properly in case the input can be firing multiple >>>> times. >>>> >>>> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> I figured out the Never.ever() approach and it seems to work. Will >>>>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems >>>>> like >>>>> this will be quite a useful transform. >>>>> >>>>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> >>>>> wrote: >>>>> >>>>>> I'm a bit confused by all of these suggestions: they sound plausible >>>>>> at a high level, but I'm having a hard time making any one of them >>>>>> concrete. >>>>>> >>>>>> So suppose we want to create a transform Wait.on(PCollection<?> >>>>>> signal): PCollection<T> -> PCollection<T>. >>>>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical >>>>>> to "a", but buffers panes of "a" in any given window until the final pane >>>>>> of "sig" in the same window is fired (or, if it's never fired, until the >>>>>> window closes? could use a deadletter for that maybe). >>>>>> >>>>>> This transform I suppose would need to have a keyed and unkeyed >>>>>> version. >>>>>> >>>>>> The keyed version would support merging window fns, and would require >>>>>> "a" and "sig" to be keyed by the same key, and would work using a CoGbk - >>>>>> followed by a stateful ParDo? Or is there a way to get away without a >>>>>> stateful ParDo here? (not all runners support it) >>>>>> >>>>>> The unkeyed version would not support merging window fns. Reuven, can >>>>>> you elaborate how your combiner idea would work here - in particular, >>>>>> what >>>>>> do you mean by "triggering only on the final pane"? Do you mean filter >>>>>> non-final panes before entering the combiner? I wonder if that'll work, >>>>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on >>>>>> the side input with a Never.ever() trigger"? >>>>>> >>>>>> Thanks. >>>>>> >>>>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> This is an interesting point. >>>>>>> >>>>>>> In the past, we've often just though about sequencing some action to >>>>>>> take place after the sink, in which case you can simply use the sink >>>>>>> output >>>>>>> as a main input. However if you want to run a transform with another >>>>>>> PCollection as a main input, this doesn't work. And as you've >>>>>>> discovered, >>>>>>> triggered side inputs are defined to be non-deterministic, and there's >>>>>>> no >>>>>>> way to make things line up. >>>>>>> >>>>>>> What you're describing only makes sense if you're blocking against >>>>>>> the final pane (since otherwise there's no reasonable way to match up >>>>>>> somePC panes with the sink panes). There are multiple ways you can do >>>>>>> this: >>>>>>> one would be to CGBK the two PCollections together, and trigger the new >>>>>>> transform only on the final pane. Another would be to add a combiner >>>>>>> that >>>>>>> returns a Void, triggering only on the final pane, and then make this >>>>>>> singleton Void a side input. You could also do something explicit with >>>>>>> the >>>>>>> state API. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov < >>>>>>> kirpic...@google.com> wrote: >>>>>>> >>>>>>>> So this appears not as easy as anticipated (surprise!) >>>>>>>> >>>>>>>> Suppose we have a PCollection "donePanes" with an element per >>>>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane >>>>>>>> of >>>>>>>> data has been written; this pane is: final / non-final". >>>>>>>> >>>>>>>> Suppose we want to use this to ensure that >>>>>>>> somePc.apply(ParDo.of(fn)) happens only after the final pane has been >>>>>>>> written. >>>>>>>> >>>>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to >>>>>>>> happen when c emits a *final* pane. >>>>>>>> >>>>>>>> Unfortunately, using >>>>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) >>>>>>>> doesn't do >>>>>>>> the trick: the side input becomes ready the moment *the first *pane >>>>>>>> of data has been written. >>>>>>>> >>>>>>>> But neither does >>>>>>>> ParDo.of(fn).withSideInputs(donePanes.apply(...filter only final >>>>>>>> panes...).apply(View.asSingleton())). It also becomes ready the moment >>>>>>>> *the >>>>>>>> first* pane has been written, you just get an exception if you >>>>>>>> access the side input before the *final* pane was written. >>>>>>>> >>>>>>>> I can't think of a pure-Beam solution to this: either "donePanes" >>>>>>>> will be used as a main input to something (and then everything else can >>>>>>>> only be a side input, which is not general enough), or it will be used >>>>>>>> as a >>>>>>>> side input (and then we can't achieve "trigger only after the final >>>>>>>> pane >>>>>>>> fires"). >>>>>>>> >>>>>>>> It seems that we need a way to control the side input pushback, and >>>>>>>> configure whether a view becomes ready when its first pane has fired or >>>>>>>> when its last pane has fired. I could see this be a property on the >>>>>>>> View >>>>>>>> transform itself. In terms of implementation - I tried to figure out >>>>>>>> how >>>>>>>> side input readiness is determined, in the direct runner and Dataflow >>>>>>>> runner, and I'm completely lost and would appreciate some help. >>>>>>>> >>>>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This sounds great! >>>>>>>>> >>>>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers <bchamb...@apache.org >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> This would be absolutely great! It seems somewhat similar to the >>>>>>>>>> changes that were made to the BigQuery sink to support WriteResult ( >>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java >>>>>>>>>> ). >>>>>>>>>> >>>>>>>>>> I find it helpful to think about the different things that may >>>>>>>>>> come after a sink. For instance: >>>>>>>>>> >>>>>>>>>> 1. It might be helpful to have a collection of failing input >>>>>>>>>> elements. The type of failed elements is pretty straightforward -- >>>>>>>>>> just the >>>>>>>>>> input elements. This allows handling such failures by directing them >>>>>>>>>> elsewhere or performing additional processing. >>>>>>>>>> >>>>>>>>> >>>>>>>>> BigQueryIO already does this as you point out. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2. For a sink that produces a series of files, it might be useful >>>>>>>>>> to have a collection of the file names that have been completely >>>>>>>>>> written. >>>>>>>>>> This allows performing additional handling on these completed >>>>>>>>>> segments. >>>>>>>>>> >>>>>>>>> >>>>>>>>> In fact we already do this for FileBasedSinks. See >>>>>>>>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java >>>>>>>>> >>>>>>>>>> >>>>>>>>>> 3. For a sink that updates some destination, it would be >>>>>>>>>> reasonable to have a collection that provides (periodically) output >>>>>>>>>> indicating how complete the information written to that destination >>>>>>>>>> is. For >>>>>>>>>> instance, this might be something like "<this bigquery table> has >>>>>>>>>> all of >>>>>>>>>> the elements up to <input watermark>" complete. This allows tracking >>>>>>>>>> how >>>>>>>>>> much information has been completely written out. >>>>>>>>>> >>>>>>>>> >>>>>>>>> Interesting. Maybe tough to do since sinks often don't have that >>>>>>>>> knowledge. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> I think those concepts map to the more detailed description >>>>>>>>>> Eugene provided, but I find it helpful to focus on what information >>>>>>>>>> comes >>>>>>>>>> out of the sink and how it might be used. >>>>>>>>>> >>>>>>>>>> Were there any use cases the above miss? Any functionality that >>>>>>>>>> has been described that doesn't map to these use cases? >>>>>>>>>> >>>>>>>>>> -- Ben >>>>>>>>>> >>>>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov < >>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> It makes sense to consider how this maps onto existing kinds of >>>>>>>>>>> sinks. >>>>>>>>>>> >>>>>>>>>>> E.g.: >>>>>>>>>>> - Something that just makes an RPC per record, e.g. >>>>>>>>>>> MqttIO.write(): that will emit 1 result per bundle (either a bogus >>>>>>>>>>> value or >>>>>>>>>>> number of records written) that will be Combine'd into 1 result per >>>>>>>>>>> pane of >>>>>>>>>>> input. A user can sequence against this and be notified when some >>>>>>>>>>> intermediate amount of data has been written for a window, or (via >>>>>>>>>>> .isFinal()) when all of it has been written. >>>>>>>>>>> - Something that e.g. initiates an import job, such as >>>>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up >>>>>>>>>>> atomic >>>>>>>>>>> index swap: should emit 1 result per import job, e.g. containing >>>>>>>>>>> information about the job (e.g. its id and statistics). Role of >>>>>>>>>>> panes is >>>>>>>>>>> the same. >>>>>>>>>>> - Something like above but that supports dynamic destinations: >>>>>>>>>>> like in WriteFiles, result will be PCollection<KV<DestinationT, >>>>>>>>>>> ResultT>> >>>>>>>>>>> where ResultT may be something like a list of files that were >>>>>>>>>>> written for >>>>>>>>>>> this pane of this destination. >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov < >>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I agree that the proper API for enabling the use case "do >>>>>>>>>>>> something after the data has been written" is to return a >>>>>>>>>>>> PCollection of >>>>>>>>>>>> objects where each object represents the result of writing some >>>>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to this >>>>>>>>>>>> PCollection, in order to "do something after this subset has been >>>>>>>>>>>> written". >>>>>>>>>>>> >>>>>>>>>>>> The challenging part here is *identifying* the subset of the >>>>>>>>>>>> data that's been written, in a way consistent with Beam's unified >>>>>>>>>>>> batch/streaming model, where saying "all data has been written" is >>>>>>>>>>>> not an >>>>>>>>>>>> option because more data can arrive. >>>>>>>>>>>> >>>>>>>>>>>> The next choice is "a window of input has been written", but >>>>>>>>>>>> then again, late data can arrive into a window as well. >>>>>>>>>>>> >>>>>>>>>>>> Next choice after that is "a pane of input has been written", >>>>>>>>>>>> but per https://s.apache.org/beam-sink-triggers the term "pane >>>>>>>>>>>> of input" is moot: triggering and panes should be something >>>>>>>>>>>> private to the >>>>>>>>>>>> sink, and the same input can trigger different sinks differently. >>>>>>>>>>>> The >>>>>>>>>>>> hypothetical different accumulation modes make this trickier >>>>>>>>>>>> still. I'm not >>>>>>>>>>>> sure whether we intend to also challenge the idea that windowing is >>>>>>>>>>>> inherent to the collection, or whether it too should be specified >>>>>>>>>>>> on a >>>>>>>>>>>> transform that processes the collection. I think for the sake of >>>>>>>>>>>> this >>>>>>>>>>>> discussion we can assume that it's inherent, and assume the mental >>>>>>>>>>>> model >>>>>>>>>>>> that the elements in different windows of a PCollection are >>>>>>>>>>>> processed >>>>>>>>>>>> independently - "as if" there were multiple pipelines processing >>>>>>>>>>>> each >>>>>>>>>>>> window. >>>>>>>>>>>> >>>>>>>>>>>> Overall, embracing the full picture, we end up with something >>>>>>>>>>>> like this: >>>>>>>>>>>> - The input PCollection is a composition of windows. >>>>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or >>>>>>>>>>>> sliding windows), the below applies to the entire contents of the >>>>>>>>>>>> PCollection. If it's merging (e.g. session windows), then it >>>>>>>>>>>> applies >>>>>>>>>>>> per-key, and the input should be (perhaps implicitly) keyed in a >>>>>>>>>>>> way that >>>>>>>>>>>> the sink understands - for example, the grouping by destination in >>>>>>>>>>>> DynamicDestinations in file and bigquery writes. >>>>>>>>>>>> - Each window's contents is a "changelog" - stream of elements >>>>>>>>>>>> and retractions. >>>>>>>>>>>> - A "sink" processes each window of the collection, deciding >>>>>>>>>>>> how to handle elements and retractions (and whether to support >>>>>>>>>>>> retractions >>>>>>>>>>>> at all) in a sink-specific way, and deciding *when* to perform the >>>>>>>>>>>> side >>>>>>>>>>>> effects for a portion of the changelog (a "pane") based on the >>>>>>>>>>>> sink's >>>>>>>>>>>> triggering strategy. >>>>>>>>>>>> - If the side effect itself is parallelized, then there'll be >>>>>>>>>>>> multiple results for the pane - e.g. one per bundle. >>>>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a >>>>>>>>>>>> list of filenames that have been written, or simply a number of >>>>>>>>>>>> records >>>>>>>>>>>> that was written, or a bogus void value etc. The result will >>>>>>>>>>>> implicitly >>>>>>>>>>>> include the window of the input it's associated with. It will also >>>>>>>>>>>> implicitly include pane information - index of the pane in this >>>>>>>>>>>> window, and >>>>>>>>>>>> whether this is the first or last pane. >>>>>>>>>>>> - The partitioning into bundles is an implementation detail and >>>>>>>>>>>> not very useful, so before presenting the pane write results to >>>>>>>>>>>> the user, >>>>>>>>>>>> the sink will probably want to Combine the bundle results so that >>>>>>>>>>>> there >>>>>>>>>>>> ends up being 1 value for each pane that was written. Once again >>>>>>>>>>>> note that >>>>>>>>>>>> panes may be associated with windows of the input as a whole, but >>>>>>>>>>>> if the >>>>>>>>>>>> input is keyed (like with DynamicDestinations) they'll be >>>>>>>>>>>> associated with >>>>>>>>>>>> per-key subsets of windows of the input. >>>>>>>>>>>> - This combining requires an extra, well, combining operation, >>>>>>>>>>>> so it should be optional. >>>>>>>>>>>> - The user will end up getting either a PCollection<ResultT> or >>>>>>>>>>>> a PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and >>>>>>>>>>>> ResultT, where >>>>>>>>>>>> the elements of this collection will implicitly have window and >>>>>>>>>>>> pane >>>>>>>>>>>> information, available via the implicit BoundedWindow and PaneInfo. >>>>>>>>>>>> - Until "sink triggering" is implemented, we'll have to embrace >>>>>>>>>>>> the fact that trigger strategy is set on the input. But in that >>>>>>>>>>>> case the >>>>>>>>>>>> user will have to accept that the PaneInfo of ResultT's is not >>>>>>>>>>>> necessarily >>>>>>>>>>>> directly related to panes of the input - the sink is allowed to do >>>>>>>>>>>> internal >>>>>>>>>>>> aggregation as an implementation detail, which may modify the >>>>>>>>>>>> triggering >>>>>>>>>>>> strategy. Basically the user will still get sink-assigned panes. >>>>>>>>>>>> - In most cases, one may imagine that the user is interested in >>>>>>>>>>>> being notified of "no more data associated with this window will be >>>>>>>>>>>> written", so the user will ignore all ResultT's except those where >>>>>>>>>>>> the pane >>>>>>>>>>>> is marked final. If a user is interested in being notified of >>>>>>>>>>>> intermediate >>>>>>>>>>>> write results - they'll have to embrace the fact that they cannot >>>>>>>>>>>> identify >>>>>>>>>>>> the precise subset of input associated with the intermediate >>>>>>>>>>>> result. >>>>>>>>>>>> >>>>>>>>>>>> I think the really key points of the above are: >>>>>>>>>>>> - Sinks should support windowed input. Sinks should write >>>>>>>>>>>> different windows of input independently. If the sink can write >>>>>>>>>>>> multi-destination input, the destination should function as a >>>>>>>>>>>> grouping key, >>>>>>>>>>>> and in that case merging windowing should be allowed. >>>>>>>>>>>> - Producing a PCollection of write results should be optional. >>>>>>>>>>>> - When asked to produce results, sinks produce a PCollection of >>>>>>>>>>>> results that may be keyed or unkeyed (per above), and are placed >>>>>>>>>>>> in the >>>>>>>>>>>> window of the input that was written, and have a PaneInfo assigned >>>>>>>>>>>> by the >>>>>>>>>>>> sink, of which probably the only part useful to the user is >>>>>>>>>>>> whether it's >>>>>>>>>>>> .isFinal(). >>>>>>>>>>>> >>>>>>>>>>>> Does this sound reasonable? >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw < >>>>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> +1 >>>>>>>>>>>>> >>>>>>>>>>>>> At the very least an empty PCollection<?> could be produced >>>>>>>>>>>>> with no >>>>>>>>>>>>> promises about its contents but the ability to be followed >>>>>>>>>>>>> (e.g. as a >>>>>>>>>>>>> side input), which is forward compatible with whatever actual >>>>>>>>>>>>> metadata >>>>>>>>>>>>> one may decide to produce in the future. >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles < >>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>> > +dev@ >>>>>>>>>>>>> > >>>>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are >>>>>>>>>>>>> easy to >>>>>>>>>>>>> > understand and a good way for an IO to communicate and >>>>>>>>>>>>> establish causal >>>>>>>>>>>>> > dependencies. Converting an IO from PDone to real output may >>>>>>>>>>>>> spur further >>>>>>>>>>>>> > useful thoughts based on the design decisions about what >>>>>>>>>>>>> sort of output is >>>>>>>>>>>>> > most useful. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Kenn >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik < >>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> I think all sinks actually do have valuable information to >>>>>>>>>>>>> output which >>>>>>>>>>>>> >> can be used after a write (file names, >>>>>>>>>>>>> transaction/commit/row ids, table >>>>>>>>>>>>> >> names, ...). In addition to this metadata, having a >>>>>>>>>>>>> PCollection of all >>>>>>>>>>>>> >> successful writes and all failed writes is useful for users >>>>>>>>>>>>> so they can >>>>>>>>>>>>> >> chain an action which depends on what was or wasn't >>>>>>>>>>>>> successfully written. >>>>>>>>>>>>> >> Users have requested adding retry/failure handling policies >>>>>>>>>>>>> to sinks so that >>>>>>>>>>>>> >> failed writes don't jam up the pipeline. >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>>>>>>>>>>> chet.aldr...@postmates.com> >>>>>>>>>>>>> >> wrote: >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> So I agree generally with the idea that returning a >>>>>>>>>>>>> PCollection makes all >>>>>>>>>>>>> >>> of this easier so that arbitrary additional functions can >>>>>>>>>>>>> be added, what >>>>>>>>>>>>> >>> exactly would write functions be returning in a >>>>>>>>>>>>> PCollection that would make >>>>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an external >>>>>>>>>>>>> source and now >>>>>>>>>>>>> >>> the collection itself is no longer needed. >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently >>>>>>>>>>>>> that doesn’t >>>>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible >>>>>>>>>>>>> ways of handling >>>>>>>>>>>>> >>> this given this conversation, and am curious which >>>>>>>>>>>>> solution sounds like the >>>>>>>>>>>>> >>> best way to deal with the problem: >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> 1. Have output transforms always return something specific >>>>>>>>>>>>> (which would >>>>>>>>>>>>> >>> be the same across transforms by convention), that is in >>>>>>>>>>>>> the form of a >>>>>>>>>>>>> >>> PCollection, so operations can occur after it. >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a >>>>>>>>>>>>> PCollection so we >>>>>>>>>>>>> >>> can run applies afterward. >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> 3. Make output transforms provide the facility for a >>>>>>>>>>>>> callback function >>>>>>>>>>>>> >>> which runs after the transform is complete. >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> I went through these gymnastics recently when I was trying >>>>>>>>>>>>> to build >>>>>>>>>>>>> >>> something that would move indices after writing to >>>>>>>>>>>>> Algolia, and the solution >>>>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to >>>>>>>>>>>>> exist in Beam. The >>>>>>>>>>>>> >>> problem is that particular method requires the output >>>>>>>>>>>>> transform in question >>>>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t >>>>>>>>>>>>> make sense to >>>>>>>>>>>>> >>> return one. This seems like a bad solution, but >>>>>>>>>>>>> unfortunately there isn’t a >>>>>>>>>>>>> >>> notion of a transform that has no explicit output that >>>>>>>>>>>>> needs to have >>>>>>>>>>>>> >>> operations occur after it. >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> The three potential solutions above address this issue, >>>>>>>>>>>>> but I would like >>>>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a >>>>>>>>>>>>> different proposal >>>>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on >>>>>>>>>>>>> this, since it >>>>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it >>>>>>>>>>>>> really useful, for >>>>>>>>>>>>> >>> one. >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> Chet >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> Instead of a callback fn, its most useful if a PCollection >>>>>>>>>>>>> is returned >>>>>>>>>>>>> >>> containing the result of the sink so that any arbitrary >>>>>>>>>>>>> additional functions >>>>>>>>>>>>> >>> can be applied. >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>>>>>>>>>>> j...@nanthrax.net> >>>>>>>>>>>>> >>> wrote: >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more >>>>>>>>>>>>> than in the >>>>>>>>>>>>> >>>> main. >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> Regards >>>>>>>>>>>>> >>>> JB >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> I do something almost exactly like this, but with >>>>>>>>>>>>> BigtableIO instead. >>>>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I >>>>>>>>>>>>> need to finish this >>>>>>>>>>>>> >>>>> up...). It would really be nice for most IOs to support >>>>>>>>>>>>> something like >>>>>>>>>>>>> >>>>> this. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on >>>>>>>>>>>>> the output from >>>>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function >>>>>>>>>>>>> which will run when >>>>>>>>>>>>> >>>>> all writes finish. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> You probably want to avoid doing something in the main >>>>>>>>>>>>> method because >>>>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the >>>>>>>>>>>>> driver will die, get >>>>>>>>>>>>> >>>>> killed, machine will explode, etc). >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick < >>>>>>>>>>>>> nerdyn...@gmail.com >>>>>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Assuming you're in Java. You could just follow on in >>>>>>>>>>>>> your Main >>>>>>>>>>>>> >>>>> method. >>>>>>>>>>>>> >>>>> Checking the state of the Result. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Example: >>>>>>>>>>>>> >>>>> PipelineResult result = pipeline.run(); >>>>>>>>>>>>> >>>>> try { >>>>>>>>>>>>> >>>>> result.waitUntilFinish(); >>>>>>>>>>>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>>>>>>>>>> >>>>> //DO ES work >>>>>>>>>>>>> >>>>> } >>>>>>>>>>>>> >>>>> } catch(Exception e) { >>>>>>>>>>>>> >>>>> result.cancel(); >>>>>>>>>>>>> >>>>> throw e; >>>>>>>>>>>>> >>>>> } >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Otherwise you could also use Oozie to construct a >>>>>>>>>>>>> work flow. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>>>>>>>>>> >>>>> <j...@nanthrax.net >>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>> wrote: >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Hi, >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> yes, we had a similar question some days ago. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> We can imagine to have a user callback fn fired >>>>>>>>>>>>> when the sink >>>>>>>>>>>>> >>>>> batch is >>>>>>>>>>>>> >>>>> complete. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Let me think about that. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Regards >>>>>>>>>>>>> >>>>> JB >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Hey JB, >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Thanks for getting back so quickly. >>>>>>>>>>>>> >>>>> I suppose in that case I would need a way of >>>>>>>>>>>>> monitoring >>>>>>>>>>>>> >>>>> when the ES >>>>>>>>>>>>> >>>>> transform completes successfully before I >>>>>>>>>>>>> can proceed with >>>>>>>>>>>>> >>>>> doing the >>>>>>>>>>>>> >>>>> swap. >>>>>>>>>>>>> >>>>> The problem with this is that I can't think >>>>>>>>>>>>> of a good way >>>>>>>>>>>>> >>>>> to >>>>>>>>>>>>> >>>>> determine that termination state short of >>>>>>>>>>>>> polling the new >>>>>>>>>>>>> >>>>> index to >>>>>>>>>>>>> >>>>> check the document count compared to the >>>>>>>>>>>>> size of input >>>>>>>>>>>>> >>>>> PCollection. >>>>>>>>>>>>> >>>>> That, or maybe I'd need to use an external >>>>>>>>>>>>> system like you >>>>>>>>>>>>> >>>>> mentioned >>>>>>>>>>>>> >>>>> to poll on the state of the pipeline (I'm >>>>>>>>>>>>> using Google >>>>>>>>>>>>> >>>>> Dataflow, so >>>>>>>>>>>>> >>>>> maybe there's a way to do this with some >>>>>>>>>>>>> API). >>>>>>>>>>>>> >>>>> But I would have thought that there would be >>>>>>>>>>>>> an easy way of >>>>>>>>>>>>> >>>>> simply >>>>>>>>>>>>> >>>>> saying "do not process this transform until >>>>>>>>>>>>> this other >>>>>>>>>>>>> >>>>> transform >>>>>>>>>>>>> >>>>> completes". >>>>>>>>>>>>> >>>>> Is there no established way of "signaling" >>>>>>>>>>>>> between >>>>>>>>>>>>> >>>>> pipelines when >>>>>>>>>>>>> >>>>> some pipeline completes, or have some way of >>>>>>>>>>>>> declaring a >>>>>>>>>>>>> >>>>> dependency >>>>>>>>>>>>> >>>>> of 1 transform on another transform? >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Thanks again, >>>>>>>>>>>>> >>>>> Philip >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, >>>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>>> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Hi Philip, >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> You won't be able to do (3) in the same >>>>>>>>>>>>> pipeline as >>>>>>>>>>>>> >>>>> the >>>>>>>>>>>>> >>>>> Elasticsearch Sink >>>>>>>>>>>>> >>>>> PTransform ends the pipeline with PDone. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> So, (3) has to be done in another >>>>>>>>>>>>> pipeline (using a >>>>>>>>>>>>> >>>>> DoFn) or in >>>>>>>>>>>>> >>>>> another >>>>>>>>>>>>> >>>>> "system" (like Camel for instance). I >>>>>>>>>>>>> would do a check >>>>>>>>>>>>> >>>>> of the >>>>>>>>>>>>> >>>>> data in the >>>>>>>>>>>>> >>>>> index and then trigger the swap there. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Regards >>>>>>>>>>>>> >>>>> JB >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Hi, >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> I'm pretty new to Beam, and I've >>>>>>>>>>>>> been trying to >>>>>>>>>>>>> >>>>> use the >>>>>>>>>>>>> >>>>> ElasticSearchIO >>>>>>>>>>>>> >>>>> sink to write docs into ES. >>>>>>>>>>>>> >>>>> With this, I want to be able to >>>>>>>>>>>>> >>>>> 1. ingest and transform rows from >>>>>>>>>>>>> DB (done) >>>>>>>>>>>>> >>>>> 2. write JSON docs/strings into a >>>>>>>>>>>>> new ES index >>>>>>>>>>>>> >>>>> (done) >>>>>>>>>>>>> >>>>> 3. After (2) is complete and all >>>>>>>>>>>>> documents are >>>>>>>>>>>>> >>>>> written into >>>>>>>>>>>>> >>>>> a new index, >>>>>>>>>>>>> >>>>> trigger an atomic index swap under >>>>>>>>>>>>> an alias to >>>>>>>>>>>>> >>>>> replace the >>>>>>>>>>>>> >>>>> current >>>>>>>>>>>>> >>>>> aliased index with the new index >>>>>>>>>>>>> generated in step >>>>>>>>>>>>> >>>>> 2. This >>>>>>>>>>>>> >>>>> is basically >>>>>>>>>>>>> >>>>> a single POST request to the ES >>>>>>>>>>>>> cluster. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> The problem I'm facing is that I >>>>>>>>>>>>> don't seem to be >>>>>>>>>>>>> >>>>> able to >>>>>>>>>>>>> >>>>> find a way to >>>>>>>>>>>>> >>>>> have a way for (3) to happen after >>>>>>>>>>>>> step (2) is >>>>>>>>>>>>> >>>>> complete. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> The ElasticSearchIO.Write transform >>>>>>>>>>>>> returns a >>>>>>>>>>>>> >>>>> PDone, and >>>>>>>>>>>>> >>>>> I'm not sure >>>>>>>>>>>>> >>>>> how to proceed from there because >>>>>>>>>>>>> it doesn't seem >>>>>>>>>>>>> >>>>> to let me >>>>>>>>>>>>> >>>>> do another >>>>>>>>>>>>> >>>>> apply on it to "define" a >>>>>>>>>>>>> dependency. >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> < >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> > >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> < >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> < >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> >> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> < >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> < >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> > >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> < >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> < >>>>>>>>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Is there a recommended way to >>>>>>>>>>>>> construct pipelines >>>>>>>>>>>>> >>>>> workflows >>>>>>>>>>>>> >>>>> like this? >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> Thanks in advance, >>>>>>>>>>>>> >>>>> Philip >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>>>> >>>>> jbono...@apache.org <mailto: >>>>>>>>>>>>> jbono...@apache.org> >>>>>>>>>>>>> >>>>> <mailto:jbono...@apache.org <mailto: >>>>>>>>>>>>> jbono...@apache.org>> >>>>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >>>>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>>>>>>>>>> >>>>> ---------------------------------------------------- >>>>>>>>>>>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>>>>>>>>>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>>>>>>>>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>> >>>>>>>>>>>>> >>>> -- >>>>>>>>>>>>> >>>> Jean-Baptiste Onofré >>>>>>>>>>>>> >>>> jbono...@apache.org >>>>>>>>>>>>> >>>> http://blog.nanthrax.net >>>>>>>>>>>>> >>>> Talend - http://www.talend.com >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >> >>>>>>>>>>>>> > >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>> >>> >