Not sure I understand: Notification when each pane is done (e.g .from a sink) is useful, and well defined. Example use case: writing per-pane catalog files containing the list of files written in each pane.
Process another PCollection after another one has finished processing is somewhat definable. Each will have a unique first pane, a unique on-time pane, and a unique final pane.However keep in mind that it is not guaranteed that two separate transforms see the same set of late data, so you can't make any assumptions about the elements matching up in the two PCollections. Reuven On Fri, Dec 22, 2017 at 9:38 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > It is well-defined, but in the global window anything that has only 1 pane > is not that useful in streaming pipelines :) > > Another well-defined thing: "watermark of the signal + its allowed > lateness pass the end of the main input's window". > Hmm I think this can be implemented by simply rewindowing the signal into > the main input's WindowFn? Perhaps we should even always do this? > > On Fri, Dec 22, 2017 at 8:11 AM Reuven Lax <re...@google.com> wrote: > >> I think it is also well-defined for the on-time (watermark) pane, as >> there is always only one such. In general I think sequencing is well >> defined for three panes: >> >> 1. The first pane fired (this is how side inputs are sequenced) >> 2. The first on-time pane, as there is only one. >> 3. The final pane. >> >> >> >> On Thu, Dec 21, 2017 at 5:49 PM, Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> Yeah. And I don't think there's a good way to define what sequencing >>> even means, if the sink is returning results in windows that aren't gonna >>> have a final pane. >>> >>> On Thu, Dec 21, 2017, 2:00 AM Reuven Lax <re...@google.com> wrote: >>> >>>> This is only for "final pane" waiting, correct? So someone who writes a >>>> sink in the global window probably would not want to use this. >>>> >>>> On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov <kirpic...@google.com >>>> > wrote: >>>> >>>>> PR is out https://github.com/apache/beam/pull/4301 >>>>> >>>>> This should allow us to have useful sequencing for sinks like >>>>> BigtableIO / BigQueryIO. >>>>> >>>>> Adding a couple of interested parties: >>>>> - Steve, would you be interested in using this in >>>>> https://github.com/apache/beam/pull/3997 ? >>>>> - Mairbek: this should help in https://github.com/apache/ >>>>> beam/pull/4264 - in particular, this works properly in case the input >>>>> can be firing multiple times. >>>>> >>>>> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov <kirpic...@google.com> >>>>> wrote: >>>>> >>>>>> I figured out the Never.ever() approach and it seems to work. Will >>>>>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems >>>>>> like >>>>>> this will be quite a useful transform. >>>>>> >>>>>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov < >>>>>> kirpic...@google.com> wrote: >>>>>> >>>>>>> I'm a bit confused by all of these suggestions: they sound plausible >>>>>>> at a high level, but I'm having a hard time making any one of them >>>>>>> concrete. >>>>>>> >>>>>>> So suppose we want to create a transform Wait.on(PCollection<?> >>>>>>> signal): PCollection<T> -> PCollection<T>. >>>>>>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical >>>>>>> to "a", but buffers panes of "a" in any given window until the final >>>>>>> pane >>>>>>> of "sig" in the same window is fired (or, if it's never fired, until the >>>>>>> window closes? could use a deadletter for that maybe). >>>>>>> >>>>>>> This transform I suppose would need to have a keyed and unkeyed >>>>>>> version. >>>>>>> >>>>>>> The keyed version would support merging window fns, and would >>>>>>> require "a" and "sig" to be keyed by the same key, and would work using >>>>>>> a >>>>>>> CoGbk - followed by a stateful ParDo? Or is there a way to get away >>>>>>> without >>>>>>> a stateful ParDo here? (not all runners support it) >>>>>>> >>>>>>> The unkeyed version would not support merging window fns. Reuven, >>>>>>> can you elaborate how your combiner idea would work here - in >>>>>>> particular, >>>>>>> what do you mean by "triggering only on the final pane"? Do you mean >>>>>>> filter >>>>>>> non-final panes before entering the combiner? I wonder if that'll work, >>>>>>> probably worth a shot. And Kenn, can you elaborate on "re-trigger >>>>>>> on the side input with a Never.ever() trigger"? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> This is an interesting point. >>>>>>>> >>>>>>>> In the past, we've often just though about sequencing some action >>>>>>>> to take place after the sink, in which case you can simply use the sink >>>>>>>> output as a main input. However if you want to run a transform with >>>>>>>> another >>>>>>>> PCollection as a main input, this doesn't work. And as you've >>>>>>>> discovered, >>>>>>>> triggered side inputs are defined to be non-deterministic, and there's >>>>>>>> no >>>>>>>> way to make things line up. >>>>>>>> >>>>>>>> What you're describing only makes sense if you're blocking against >>>>>>>> the final pane (since otherwise there's no reasonable way to match up >>>>>>>> somePC panes with the sink panes). There are multiple ways you can do >>>>>>>> this: >>>>>>>> one would be to CGBK the two PCollections together, and trigger the new >>>>>>>> transform only on the final pane. Another would be to add a combiner >>>>>>>> that >>>>>>>> returns a Void, triggering only on the final pane, and then make this >>>>>>>> singleton Void a side input. You could also do something explicit with >>>>>>>> the >>>>>>>> state API. >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> So this appears not as easy as anticipated (surprise!) >>>>>>>>> >>>>>>>>> Suppose we have a PCollection "donePanes" with an element per >>>>>>>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane >>>>>>>>> of >>>>>>>>> data has been written; this pane is: final / non-final". >>>>>>>>> >>>>>>>>> Suppose we want to use this to ensure that >>>>>>>>> somePc.apply(ParDo.of(fn)) happens only after the final pane has been >>>>>>>>> written. >>>>>>>>> >>>>>>>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to >>>>>>>>> happen when c emits a *final* pane. >>>>>>>>> >>>>>>>>> Unfortunately, using ParDo.of(fn).withSideInputs( >>>>>>>>> donePanes.apply(View.asSingleton())) doesn't do the trick: the >>>>>>>>> side input becomes ready the moment *the first *pane of data has >>>>>>>>> been written. >>>>>>>>> >>>>>>>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter >>>>>>>>> only final panes...).apply(View.asSingleton())). It also becomes >>>>>>>>> ready the moment *the first* pane has been written, you just get >>>>>>>>> an exception if you access the side input before the *final* pane >>>>>>>>> was written. >>>>>>>>> >>>>>>>>> I can't think of a pure-Beam solution to this: either "donePanes" >>>>>>>>> will be used as a main input to something (and then everything else >>>>>>>>> can >>>>>>>>> only be a side input, which is not general enough), or it will be >>>>>>>>> used as a >>>>>>>>> side input (and then we can't achieve "trigger only after the final >>>>>>>>> pane >>>>>>>>> fires"). >>>>>>>>> >>>>>>>>> It seems that we need a way to control the side input pushback, >>>>>>>>> and configure whether a view becomes ready when its first pane has >>>>>>>>> fired or >>>>>>>>> when its last pane has fired. I could see this be a property on the >>>>>>>>> View >>>>>>>>> transform itself. In terms of implementation - I tried to figure out >>>>>>>>> how >>>>>>>>> side input readiness is determined, in the direct runner and Dataflow >>>>>>>>> runner, and I'm completely lost and would appreciate some help. >>>>>>>>> >>>>>>>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> This sounds great! >>>>>>>>>> >>>>>>>>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers < >>>>>>>>>> bchamb...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> This would be absolutely great! It seems somewhat similar to the >>>>>>>>>>> changes that were made to the BigQuery sink to support WriteResult ( >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/ >>>>>>>>>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ >>>>>>>>>>> io/gcp/bigquery/WriteResult.java). >>>>>>>>>>> >>>>>>>>>>> I find it helpful to think about the different things that may >>>>>>>>>>> come after a sink. For instance: >>>>>>>>>>> >>>>>>>>>>> 1. It might be helpful to have a collection of failing input >>>>>>>>>>> elements. The type of failed elements is pretty straightforward -- >>>>>>>>>>> just the >>>>>>>>>>> input elements. This allows handling such failures by directing them >>>>>>>>>>> elsewhere or performing additional processing. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> BigQueryIO already does this as you point out. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2. For a sink that produces a series of files, it might be >>>>>>>>>>> useful to have a collection of the file names that have been >>>>>>>>>>> completely >>>>>>>>>>> written. This allows performing additional handling on these >>>>>>>>>>> completed >>>>>>>>>>> segments. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> In fact we already do this for FileBasedSinks. See >>>>>>>>>> https://github.com/apache/beam/blob/ >>>>>>>>>> 7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/ >>>>>>>>>> main/java/org/apache/beam/sdk/io/WriteFilesResult.java >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 3. For a sink that updates some destination, it would be >>>>>>>>>>> reasonable to have a collection that provides (periodically) output >>>>>>>>>>> indicating how complete the information written to that destination >>>>>>>>>>> is. For >>>>>>>>>>> instance, this might be something like "<this bigquery table> has >>>>>>>>>>> all of >>>>>>>>>>> the elements up to <input watermark>" complete. This allows >>>>>>>>>>> tracking how >>>>>>>>>>> much information has been completely written out. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Interesting. Maybe tough to do since sinks often don't have that >>>>>>>>>> knowledge. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I think those concepts map to the more detailed description >>>>>>>>>>> Eugene provided, but I find it helpful to focus on what information >>>>>>>>>>> comes >>>>>>>>>>> out of the sink and how it might be used. >>>>>>>>>>> >>>>>>>>>>> Were there any use cases the above miss? Any functionality that >>>>>>>>>>> has been described that doesn't map to these use cases? >>>>>>>>>>> >>>>>>>>>>> -- Ben >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov < >>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> It makes sense to consider how this maps onto existing kinds of >>>>>>>>>>>> sinks. >>>>>>>>>>>> >>>>>>>>>>>> E.g.: >>>>>>>>>>>> - Something that just makes an RPC per record, e.g. >>>>>>>>>>>> MqttIO.write(): that will emit 1 result per bundle (either a bogus >>>>>>>>>>>> value or >>>>>>>>>>>> number of records written) that will be Combine'd into 1 result >>>>>>>>>>>> per pane of >>>>>>>>>>>> input. A user can sequence against this and be notified when some >>>>>>>>>>>> intermediate amount of data has been written for a window, or (via >>>>>>>>>>>> .isFinal()) when all of it has been written. >>>>>>>>>>>> - Something that e.g. initiates an import job, such as >>>>>>>>>>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up >>>>>>>>>>>> atomic >>>>>>>>>>>> index swap: should emit 1 result per import job, e.g. containing >>>>>>>>>>>> information about the job (e.g. its id and statistics). Role of >>>>>>>>>>>> panes is >>>>>>>>>>>> the same. >>>>>>>>>>>> - Something like above but that supports dynamic destinations: >>>>>>>>>>>> like in WriteFiles, result will be PCollection<KV<DestinationT, >>>>>>>>>>>> ResultT>> >>>>>>>>>>>> where ResultT may be something like a list of files that were >>>>>>>>>>>> written for >>>>>>>>>>>> this pane of this destination. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov < >>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I agree that the proper API for enabling the use case "do >>>>>>>>>>>>> something after the data has been written" is to return a >>>>>>>>>>>>> PCollection of >>>>>>>>>>>>> objects where each object represents the result of writing some >>>>>>>>>>>>> identifiable subset of the data. Then one can apply a ParDo to >>>>>>>>>>>>> this >>>>>>>>>>>>> PCollection, in order to "do something after this subset has been >>>>>>>>>>>>> written". >>>>>>>>>>>>> >>>>>>>>>>>>> The challenging part here is *identifying* the subset of the >>>>>>>>>>>>> data that's been written, in a way consistent with Beam's unified >>>>>>>>>>>>> batch/streaming model, where saying "all data has been written" >>>>>>>>>>>>> is not an >>>>>>>>>>>>> option because more data can arrive. >>>>>>>>>>>>> >>>>>>>>>>>>> The next choice is "a window of input has been written", but >>>>>>>>>>>>> then again, late data can arrive into a window as well. >>>>>>>>>>>>> >>>>>>>>>>>>> Next choice after that is "a pane of input has been written", >>>>>>>>>>>>> but per https://s.apache.org/beam-sink-triggers the term >>>>>>>>>>>>> "pane of input" is moot: triggering and panes should be something >>>>>>>>>>>>> private >>>>>>>>>>>>> to the sink, and the same input can trigger different sinks >>>>>>>>>>>>> differently. >>>>>>>>>>>>> The hypothetical different accumulation modes make this trickier >>>>>>>>>>>>> still. I'm >>>>>>>>>>>>> not sure whether we intend to also challenge the idea that >>>>>>>>>>>>> windowing is >>>>>>>>>>>>> inherent to the collection, or whether it too should be specified >>>>>>>>>>>>> on a >>>>>>>>>>>>> transform that processes the collection. I think for the sake of >>>>>>>>>>>>> this >>>>>>>>>>>>> discussion we can assume that it's inherent, and assume the >>>>>>>>>>>>> mental model >>>>>>>>>>>>> that the elements in different windows of a PCollection are >>>>>>>>>>>>> processed >>>>>>>>>>>>> independently - "as if" there were multiple pipelines processing >>>>>>>>>>>>> each >>>>>>>>>>>>> window. >>>>>>>>>>>>> >>>>>>>>>>>>> Overall, embracing the full picture, we end up with something >>>>>>>>>>>>> like this: >>>>>>>>>>>>> - The input PCollection is a composition of windows. >>>>>>>>>>>>> - If the windowing strategy is non-merging (e.g. fixed or >>>>>>>>>>>>> sliding windows), the below applies to the entire contents of the >>>>>>>>>>>>> PCollection. If it's merging (e.g. session windows), then it >>>>>>>>>>>>> applies >>>>>>>>>>>>> per-key, and the input should be (perhaps implicitly) keyed in a >>>>>>>>>>>>> way that >>>>>>>>>>>>> the sink understands - for example, the grouping by destination in >>>>>>>>>>>>> DynamicDestinations in file and bigquery writes. >>>>>>>>>>>>> - Each window's contents is a "changelog" - stream of elements >>>>>>>>>>>>> and retractions. >>>>>>>>>>>>> - A "sink" processes each window of the collection, deciding >>>>>>>>>>>>> how to handle elements and retractions (and whether to support >>>>>>>>>>>>> retractions >>>>>>>>>>>>> at all) in a sink-specific way, and deciding *when* to perform >>>>>>>>>>>>> the side >>>>>>>>>>>>> effects for a portion of the changelog (a "pane") based on the >>>>>>>>>>>>> sink's >>>>>>>>>>>>> triggering strategy. >>>>>>>>>>>>> - If the side effect itself is parallelized, then there'll be >>>>>>>>>>>>> multiple results for the pane - e.g. one per bundle. >>>>>>>>>>>>> - Each (sink-chosen) pane produces a set of results, e.g. a >>>>>>>>>>>>> list of filenames that have been written, or simply a number of >>>>>>>>>>>>> records >>>>>>>>>>>>> that was written, or a bogus void value etc. The result will >>>>>>>>>>>>> implicitly >>>>>>>>>>>>> include the window of the input it's associated with. It will also >>>>>>>>>>>>> implicitly include pane information - index of the pane in this >>>>>>>>>>>>> window, and >>>>>>>>>>>>> whether this is the first or last pane. >>>>>>>>>>>>> - The partitioning into bundles is an implementation detail >>>>>>>>>>>>> and not very useful, so before presenting the pane write results >>>>>>>>>>>>> to the >>>>>>>>>>>>> user, the sink will probably want to Combine the bundle results >>>>>>>>>>>>> so that >>>>>>>>>>>>> there ends up being 1 value for each pane that was written. Once >>>>>>>>>>>>> again note >>>>>>>>>>>>> that panes may be associated with windows of the input as a >>>>>>>>>>>>> whole, but if >>>>>>>>>>>>> the input is keyed (like with DynamicDestinations) they'll be >>>>>>>>>>>>> associated >>>>>>>>>>>>> with per-key subsets of windows of the input. >>>>>>>>>>>>> - This combining requires an extra, well, combining operation, >>>>>>>>>>>>> so it should be optional. >>>>>>>>>>>>> - The user will end up getting either a PCollection<ResultT> >>>>>>>>>>>>> or a PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and >>>>>>>>>>>>> ResultT, >>>>>>>>>>>>> where the elements of this collection will implicitly have window >>>>>>>>>>>>> and pane >>>>>>>>>>>>> information, available via the implicit BoundedWindow and >>>>>>>>>>>>> PaneInfo. >>>>>>>>>>>>> - Until "sink triggering" is implemented, we'll have to >>>>>>>>>>>>> embrace the fact that trigger strategy is set on the input. But >>>>>>>>>>>>> in that >>>>>>>>>>>>> case the user will have to accept that the PaneInfo of ResultT's >>>>>>>>>>>>> is not >>>>>>>>>>>>> necessarily directly related to panes of the input - the sink is >>>>>>>>>>>>> allowed to >>>>>>>>>>>>> do internal aggregation as an implementation detail, which may >>>>>>>>>>>>> modify the >>>>>>>>>>>>> triggering strategy. Basically the user will still get >>>>>>>>>>>>> sink-assigned panes. >>>>>>>>>>>>> - In most cases, one may imagine that the user is interested >>>>>>>>>>>>> in being notified of "no more data associated with this window >>>>>>>>>>>>> will be >>>>>>>>>>>>> written", so the user will ignore all ResultT's except those >>>>>>>>>>>>> where the pane >>>>>>>>>>>>> is marked final. If a user is interested in being notified of >>>>>>>>>>>>> intermediate >>>>>>>>>>>>> write results - they'll have to embrace the fact that they cannot >>>>>>>>>>>>> identify >>>>>>>>>>>>> the precise subset of input associated with the intermediate >>>>>>>>>>>>> result. >>>>>>>>>>>>> >>>>>>>>>>>>> I think the really key points of the above are: >>>>>>>>>>>>> - Sinks should support windowed input. Sinks should write >>>>>>>>>>>>> different windows of input independently. If the sink can write >>>>>>>>>>>>> multi-destination input, the destination should function as a >>>>>>>>>>>>> grouping key, >>>>>>>>>>>>> and in that case merging windowing should be allowed. >>>>>>>>>>>>> - Producing a PCollection of write results should be optional. >>>>>>>>>>>>> - When asked to produce results, sinks produce a PCollection >>>>>>>>>>>>> of results that may be keyed or unkeyed (per above), and are >>>>>>>>>>>>> placed in the >>>>>>>>>>>>> window of the input that was written, and have a PaneInfo >>>>>>>>>>>>> assigned by the >>>>>>>>>>>>> sink, of which probably the only part useful to the user is >>>>>>>>>>>>> whether it's >>>>>>>>>>>>> .isFinal(). >>>>>>>>>>>>> >>>>>>>>>>>>> Does this sound reasonable? >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw < >>>>>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> +1 >>>>>>>>>>>>>> >>>>>>>>>>>>>> At the very least an empty PCollection<?> could be produced >>>>>>>>>>>>>> with no >>>>>>>>>>>>>> promises about its contents but the ability to be followed >>>>>>>>>>>>>> (e.g. as a >>>>>>>>>>>>>> side input), which is forward compatible with whatever actual >>>>>>>>>>>>>> metadata >>>>>>>>>>>>>> one may decide to produce in the future. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles < >>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>> > +dev@ >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I am in complete agreement with Luke. Data dependencies are >>>>>>>>>>>>>> easy to >>>>>>>>>>>>>> > understand and a good way for an IO to communicate and >>>>>>>>>>>>>> establish causal >>>>>>>>>>>>>> > dependencies. Converting an IO from PDone to real output >>>>>>>>>>>>>> may spur further >>>>>>>>>>>>>> > useful thoughts based on the design decisions about what >>>>>>>>>>>>>> sort of output is >>>>>>>>>>>>>> > most useful. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Kenn >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik < >>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> I think all sinks actually do have valuable information to >>>>>>>>>>>>>> output which >>>>>>>>>>>>>> >> can be used after a write (file names, >>>>>>>>>>>>>> transaction/commit/row ids, table >>>>>>>>>>>>>> >> names, ...). In addition to this metadata, having a >>>>>>>>>>>>>> PCollection of all >>>>>>>>>>>>>> >> successful writes and all failed writes is useful for >>>>>>>>>>>>>> users so they can >>>>>>>>>>>>>> >> chain an action which depends on what was or wasn't >>>>>>>>>>>>>> successfully written. >>>>>>>>>>>>>> >> Users have requested adding retry/failure handling >>>>>>>>>>>>>> policies to sinks so that >>>>>>>>>>>>>> >> failed writes don't jam up the pipeline. >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >>>>>>>>>>>>>> chet.aldr...@postmates.com> >>>>>>>>>>>>>> >> wrote: >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> So I agree generally with the idea that returning a >>>>>>>>>>>>>> PCollection makes all >>>>>>>>>>>>>> >>> of this easier so that arbitrary additional functions can >>>>>>>>>>>>>> be added, what >>>>>>>>>>>>>> >>> exactly would write functions be returning in a >>>>>>>>>>>>>> PCollection that would make >>>>>>>>>>>>>> >>> sense? The whole idea is that we’ve written to an >>>>>>>>>>>>>> external source and now >>>>>>>>>>>>>> >>> the collection itself is no longer needed. >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> Currently, that’s represented with a PDone, but currently >>>>>>>>>>>>>> that doesn’t >>>>>>>>>>>>>> >>> allow any work to occur after it. I see a couple possible >>>>>>>>>>>>>> ways of handling >>>>>>>>>>>>>> >>> this given this conversation, and am curious which >>>>>>>>>>>>>> solution sounds like the >>>>>>>>>>>>>> >>> best way to deal with the problem: >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> 1. Have output transforms always return something >>>>>>>>>>>>>> specific (which would >>>>>>>>>>>>>> >>> be the same across transforms by convention), that is in >>>>>>>>>>>>>> the form of a >>>>>>>>>>>>>> >>> PCollection, so operations can occur after it. >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> 2. Make either PDone or some new type that can act as a >>>>>>>>>>>>>> PCollection so we >>>>>>>>>>>>>> >>> can run applies afterward. >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> 3. Make output transforms provide the facility for a >>>>>>>>>>>>>> callback function >>>>>>>>>>>>>> >>> which runs after the transform is complete. >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> I went through these gymnastics recently when I was >>>>>>>>>>>>>> trying to build >>>>>>>>>>>>>> >>> something that would move indices after writing to >>>>>>>>>>>>>> Algolia, and the solution >>>>>>>>>>>>>> >>> was to co-opt code from the old Sink class that used to >>>>>>>>>>>>>> exist in Beam. The >>>>>>>>>>>>>> >>> problem is that particular method requires the output >>>>>>>>>>>>>> transform in question >>>>>>>>>>>>>> >>> to return a PCollection, even if it is trivial or doesn’t >>>>>>>>>>>>>> make sense to >>>>>>>>>>>>>> >>> return one. This seems like a bad solution, but >>>>>>>>>>>>>> unfortunately there isn’t a >>>>>>>>>>>>>> >>> notion of a transform that has no explicit output that >>>>>>>>>>>>>> needs to have >>>>>>>>>>>>>> >>> operations occur after it. >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> The three potential solutions above address this issue, >>>>>>>>>>>>>> but I would like >>>>>>>>>>>>>> >>> to hear on which would be preferable (or perhaps a >>>>>>>>>>>>>> different proposal >>>>>>>>>>>>>> >>> altogether?). Perhaps we could also start up a ticket on >>>>>>>>>>>>>> this, since it >>>>>>>>>>>>>> >>> seems like a worthwhile feature addition. I would find it >>>>>>>>>>>>>> really useful, for >>>>>>>>>>>>>> >>> one. >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> Chet >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik < >>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> Instead of a callback fn, its most useful if a >>>>>>>>>>>>>> PCollection is returned >>>>>>>>>>>>>> >>> containing the result of the sink so that any arbitrary >>>>>>>>>>>>>> additional functions >>>>>>>>>>>>>> >>> can be applied. >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré < >>>>>>>>>>>>>> j...@nanthrax.net> >>>>>>>>>>>>>> >>> wrote: >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> >>>> Agree, I would prefer to do the callback in the IO more >>>>>>>>>>>>>> than in the >>>>>>>>>>>>>> >>>> main. >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> >>>> Regards >>>>>>>>>>>>>> >>>> JB >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> I do something almost exactly like this, but with >>>>>>>>>>>>>> BigtableIO instead. >>>>>>>>>>>>>> >>>>> I have a pull request open here [1] (which reminds me I >>>>>>>>>>>>>> need to finish this >>>>>>>>>>>>>> >>>>> up...). It would really be nice for most IOs to >>>>>>>>>>>>>> support something like >>>>>>>>>>>>>> >>>>> this. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on >>>>>>>>>>>>>> the output from >>>>>>>>>>>>>> >>>>> the BigtableIO, and then feed that into your function >>>>>>>>>>>>>> which will run when >>>>>>>>>>>>>> >>>>> all writes finish. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> You probably want to avoid doing something in the main >>>>>>>>>>>>>> method because >>>>>>>>>>>>>> >>>>> there's no guarantee it'll actually run (maybe the >>>>>>>>>>>>>> driver will die, get >>>>>>>>>>>>>> >>>>> killed, machine will explode, etc). >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> [1] https://github.com/apache/beam/pull/3997 >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick < >>>>>>>>>>>>>> nerdyn...@gmail.com >>>>>>>>>>>>>> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Assuming you're in Java. You could just follow on >>>>>>>>>>>>>> in your Main >>>>>>>>>>>>>> >>>>> method. >>>>>>>>>>>>>> >>>>> Checking the state of the Result. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Example: >>>>>>>>>>>>>> >>>>> PipelineResult result = pipeline.run(); >>>>>>>>>>>>>> >>>>> try { >>>>>>>>>>>>>> >>>>> result.waitUntilFinish(); >>>>>>>>>>>>>> >>>>> if(result.getState() == PipelineResult.State.DONE) { >>>>>>>>>>>>>> >>>>> //DO ES work >>>>>>>>>>>>>> >>>>> } >>>>>>>>>>>>>> >>>>> } catch(Exception e) { >>>>>>>>>>>>>> >>>>> result.cancel(); >>>>>>>>>>>>>> >>>>> throw e; >>>>>>>>>>>>>> >>>>> } >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Otherwise you could also use Oozie to construct a >>>>>>>>>>>>>> work flow. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >>>>>>>>>>>>>> >>>>> <j...@nanthrax.net >>>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>> wrote: >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Hi, >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> yes, we had a similar question some days ago. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> We can imagine to have a user callback fn fired >>>>>>>>>>>>>> when the sink >>>>>>>>>>>>>> >>>>> batch is >>>>>>>>>>>>>> >>>>> complete. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Let me think about that. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Regards >>>>>>>>>>>>>> >>>>> JB >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Hey JB, >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Thanks for getting back so quickly. >>>>>>>>>>>>>> >>>>> I suppose in that case I would need a way >>>>>>>>>>>>>> of monitoring >>>>>>>>>>>>>> >>>>> when the ES >>>>>>>>>>>>>> >>>>> transform completes successfully before I >>>>>>>>>>>>>> can proceed with >>>>>>>>>>>>>> >>>>> doing the >>>>>>>>>>>>>> >>>>> swap. >>>>>>>>>>>>>> >>>>> The problem with this is that I can't think >>>>>>>>>>>>>> of a good way >>>>>>>>>>>>>> >>>>> to >>>>>>>>>>>>>> >>>>> determine that termination state short of >>>>>>>>>>>>>> polling the new >>>>>>>>>>>>>> >>>>> index to >>>>>>>>>>>>>> >>>>> check the document count compared to the >>>>>>>>>>>>>> size of input >>>>>>>>>>>>>> >>>>> PCollection. >>>>>>>>>>>>>> >>>>> That, or maybe I'd need to use an external >>>>>>>>>>>>>> system like you >>>>>>>>>>>>>> >>>>> mentioned >>>>>>>>>>>>>> >>>>> to poll on the state of the pipeline (I'm >>>>>>>>>>>>>> using Google >>>>>>>>>>>>>> >>>>> Dataflow, so >>>>>>>>>>>>>> >>>>> maybe there's a way to do this with some >>>>>>>>>>>>>> API). >>>>>>>>>>>>>> >>>>> But I would have thought that there would >>>>>>>>>>>>>> be an easy way of >>>>>>>>>>>>>> >>>>> simply >>>>>>>>>>>>>> >>>>> saying "do not process this transform until >>>>>>>>>>>>>> this other >>>>>>>>>>>>>> >>>>> transform >>>>>>>>>>>>>> >>>>> completes". >>>>>>>>>>>>>> >>>>> Is there no established way of "signaling" >>>>>>>>>>>>>> between >>>>>>>>>>>>>> >>>>> pipelines when >>>>>>>>>>>>>> >>>>> some pipeline completes, or have some way >>>>>>>>>>>>>> of declaring a >>>>>>>>>>>>>> >>>>> dependency >>>>>>>>>>>>>> >>>>> of 1 transform on another transform? >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Thanks again, >>>>>>>>>>>>>> >>>>> Philip >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, >>>>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>>>> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >>>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <mailto:j...@nanthrax.net>>> wrote: >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Hi Philip, >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> You won't be able to do (3) in the >>>>>>>>>>>>>> same pipeline as >>>>>>>>>>>>>> >>>>> the >>>>>>>>>>>>>> >>>>> Elasticsearch Sink >>>>>>>>>>>>>> >>>>> PTransform ends the pipeline with >>>>>>>>>>>>>> PDone. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> So, (3) has to be done in another >>>>>>>>>>>>>> pipeline (using a >>>>>>>>>>>>>> >>>>> DoFn) or in >>>>>>>>>>>>>> >>>>> another >>>>>>>>>>>>>> >>>>> "system" (like Camel for instance). I >>>>>>>>>>>>>> would do a check >>>>>>>>>>>>>> >>>>> of the >>>>>>>>>>>>>> >>>>> data in the >>>>>>>>>>>>>> >>>>> index and then trigger the swap there. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Regards >>>>>>>>>>>>>> >>>>> JB >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> On 12/01/2017 08:41 AM, Philip Chan >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Hi, >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> I'm pretty new to Beam, and I've >>>>>>>>>>>>>> been trying to >>>>>>>>>>>>>> >>>>> use the >>>>>>>>>>>>>> >>>>> ElasticSearchIO >>>>>>>>>>>>>> >>>>> sink to write docs into ES. >>>>>>>>>>>>>> >>>>> With this, I want to be able to >>>>>>>>>>>>>> >>>>> 1. ingest and transform rows from >>>>>>>>>>>>>> DB (done) >>>>>>>>>>>>>> >>>>> 2. write JSON docs/strings into a >>>>>>>>>>>>>> new ES index >>>>>>>>>>>>>> >>>>> (done) >>>>>>>>>>>>>> >>>>> 3. After (2) is complete and all >>>>>>>>>>>>>> documents are >>>>>>>>>>>>>> >>>>> written into >>>>>>>>>>>>>> >>>>> a new index, >>>>>>>>>>>>>> >>>>> trigger an atomic index swap under >>>>>>>>>>>>>> an alias to >>>>>>>>>>>>>> >>>>> replace the >>>>>>>>>>>>>> >>>>> current >>>>>>>>>>>>>> >>>>> aliased index with the new index >>>>>>>>>>>>>> generated in step >>>>>>>>>>>>>> >>>>> 2. This >>>>>>>>>>>>>> >>>>> is basically >>>>>>>>>>>>>> >>>>> a single POST request to the ES >>>>>>>>>>>>>> cluster. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> The problem I'm facing is that I >>>>>>>>>>>>>> don't seem to be >>>>>>>>>>>>>> >>>>> able to >>>>>>>>>>>>>> >>>>> find a way to >>>>>>>>>>>>>> >>>>> have a way for (3) to happen after >>>>>>>>>>>>>> step (2) is >>>>>>>>>>>>>> >>>>> complete. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> The ElasticSearchIO.Write >>>>>>>>>>>>>> transform returns a >>>>>>>>>>>>>> >>>>> PDone, and >>>>>>>>>>>>>> >>>>> I'm not sure >>>>>>>>>>>>>> >>>>> how to proceed from there because >>>>>>>>>>>>>> it doesn't seem >>>>>>>>>>>>>> >>>>> to let me >>>>>>>>>>>>>> >>>>> do another >>>>>>>>>>>>>> >>>>> apply on it to "define" a >>>>>>>>>>>>>> dependency. >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> <https://beam.apache.org/documentation/sdks/javadoc/2. >>>>>>>>>>>>>> 1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO. >>>>>>>>>>>>>> Write.html>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Is there a recommended way to >>>>>>>>>>>>>> construct pipelines >>>>>>>>>>>>>> >>>>> workflows >>>>>>>>>>>>>> >>>>> like this? >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> Thanks in advance, >>>>>>>>>>>>>> >>>>> Philip >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>>>>> >>>>> jbono...@apache.org <mailto: >>>>>>>>>>>>>> jbono...@apache.org> >>>>>>>>>>>>>> >>>>> <mailto:jbono...@apache.org <mailto: >>>>>>>>>>>>>> jbono...@apache.org>> >>>>>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> -- Jean-Baptiste Onofré >>>>>>>>>>>>>> >>>>> jbono...@apache.org <mailto:jbono...@apache.org >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>> http://blog.nanthrax.net >>>>>>>>>>>>>> >>>>> Talend - http://www.talend.com >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> -- Nick Verbeck - NerdyNick >>>>>>>>>>>>>> >>>>> ----------------------------- >>>>>>>>>>>>>> ----------------------- >>>>>>>>>>>>>> >>>>> NerdyNick.com <http://NerdyNick.com> >>>>>>>>>>>>>> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >>>>>>>>>>>>>> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>> >>>> -- >>>>>>>>>>>>>> >>>> Jean-Baptiste Onofré >>>>>>>>>>>>>> >>>> jbono...@apache.org >>>>>>>>>>>>>> >>>> http://blog.nanthrax.net >>>>>>>>>>>>>> >>>> Talend - http://www.talend.com >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>> >>