It makes sense to consider how this maps onto existing kinds of sinks. E.g.: - Something that just makes an RPC per record, e.g. MqttIO.write(): that will emit 1 result per bundle (either a bogus value or number of records written) that will be Combine'd into 1 result per pane of input. A user can sequence against this and be notified when some intermediate amount of data has been written for a window, or (via .isFinal()) when all of it has been written. - Something that e.g. initiates an import job, such as BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic index swap: should emit 1 result per import job, e.g. containing information about the job (e.g. its id and statistics). Role of panes is the same. - Something like above but that supports dynamic destinations: like in WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> where ResultT may be something like a list of files that were written for this pane of this destination.
On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com> wrote: > I agree that the proper API for enabling the use case "do something after > the data has been written" is to return a PCollection of objects where each > object represents the result of writing some identifiable subset of the > data. Then one can apply a ParDo to this PCollection, in order to "do > something after this subset has been written". > > The challenging part here is *identifying* the subset of the data that's > been written, in a way consistent with Beam's unified batch/streaming > model, where saying "all data has been written" is not an option because > more data can arrive. > > The next choice is "a window of input has been written", but then again, > late data can arrive into a window as well. > > Next choice after that is "a pane of input has been written", but per > https://s.apache.org/beam-sink-triggers the term "pane of input" is moot: > triggering and panes should be something private to the sink, and the same > input can trigger different sinks differently. The hypothetical different > accumulation modes make this trickier still. I'm not sure whether we intend > to also challenge the idea that windowing is inherent to the collection, or > whether it too should be specified on a transform that processes the > collection. I think for the sake of this discussion we can assume that it's > inherent, and assume the mental model that the elements in different > windows of a PCollection are processed independently - "as if" there were > multiple pipelines processing each window. > > Overall, embracing the full picture, we end up with something like this: > - The input PCollection is a composition of windows. > - If the windowing strategy is non-merging (e.g. fixed or sliding > windows), the below applies to the entire contents of the PCollection. If > it's merging (e.g. session windows), then it applies per-key, and the input > should be (perhaps implicitly) keyed in a way that the sink understands - > for example, the grouping by destination in DynamicDestinations in file and > bigquery writes. > - Each window's contents is a "changelog" - stream of elements and > retractions. > - A "sink" processes each window of the collection, deciding how to handle > elements and retractions (and whether to support retractions at all) in a > sink-specific way, and deciding *when* to perform the side effects for a > portion of the changelog (a "pane") based on the sink's triggering strategy. > - If the side effect itself is parallelized, then there'll be multiple > results for the pane - e.g. one per bundle. > - Each (sink-chosen) pane produces a set of results, e.g. a list of > filenames that have been written, or simply a number of records that was > written, or a bogus void value etc. The result will implicitly include the > window of the input it's associated with. It will also implicitly include > pane information - index of the pane in this window, and whether this is > the first or last pane. > - The partitioning into bundles is an implementation detail and not very > useful, so before presenting the pane write results to the user, the sink > will probably want to Combine the bundle results so that there ends up > being 1 value for each pane that was written. Once again note that panes > may be associated with windows of the input as a whole, but if the input is > keyed (like with DynamicDestinations) they'll be associated with per-key > subsets of windows of the input. > - This combining requires an extra, well, combining operation, so it > should be optional. > - The user will end up getting either a PCollection<ResultT> or a > PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, where > the elements of this collection will implicitly have window and pane > information, available via the implicit BoundedWindow and PaneInfo. > - Until "sink triggering" is implemented, we'll have to embrace the fact > that trigger strategy is set on the input. But in that case the user will > have to accept that the PaneInfo of ResultT's is not necessarily directly > related to panes of the input - the sink is allowed to do internal > aggregation as an implementation detail, which may modify the triggering > strategy. Basically the user will still get sink-assigned panes. > - In most cases, one may imagine that the user is interested in being > notified of "no more data associated with this window will be written", so > the user will ignore all ResultT's except those where the pane is marked > final. If a user is interested in being notified of intermediate write > results - they'll have to embrace the fact that they cannot identify the > precise subset of input associated with the intermediate result. > > I think the really key points of the above are: > - Sinks should support windowed input. Sinks should write different > windows of input independently. If the sink can write multi-destination > input, the destination should function as a grouping key, and in that case > merging windowing should be allowed. > - Producing a PCollection of write results should be optional. > - When asked to produce results, sinks produce a PCollection of results > that may be keyed or unkeyed (per above), and are placed in the window of > the input that was written, and have a PaneInfo assigned by the sink, of > which probably the only part useful to the user is whether it's .isFinal(). > > Does this sound reasonable? > > On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com> > wrote: > >> +1 >> >> At the very least an empty PCollection<?> could be produced with no >> promises about its contents but the ability to be followed (e.g. as a >> side input), which is forward compatible with whatever actual metadata >> one may decide to produce in the future. >> >> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> wrote: >> > +dev@ >> > >> > I am in complete agreement with Luke. Data dependencies are easy to >> > understand and a good way for an IO to communicate and establish causal >> > dependencies. Converting an IO from PDone to real output may spur >> further >> > useful thoughts based on the design decisions about what sort of output >> is >> > most useful. >> > >> > Kenn >> > >> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> wrote: >> >> >> >> I think all sinks actually do have valuable information to output which >> >> can be used after a write (file names, transaction/commit/row ids, >> table >> >> names, ...). In addition to this metadata, having a PCollection of all >> >> successful writes and all failed writes is useful for users so they can >> >> chain an action which depends on what was or wasn't successfully >> written. >> >> Users have requested adding retry/failure handling policies to sinks >> so that >> >> failed writes don't jam up the pipeline. >> >> >> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich < >> chet.aldr...@postmates.com> >> >> wrote: >> >>> >> >>> So I agree generally with the idea that returning a PCollection makes >> all >> >>> of this easier so that arbitrary additional functions can be added, >> what >> >>> exactly would write functions be returning in a PCollection that >> would make >> >>> sense? The whole idea is that we’ve written to an external source and >> now >> >>> the collection itself is no longer needed. >> >>> >> >>> Currently, that’s represented with a PDone, but currently that doesn’t >> >>> allow any work to occur after it. I see a couple possible ways of >> handling >> >>> this given this conversation, and am curious which solution sounds >> like the >> >>> best way to deal with the problem: >> >>> >> >>> 1. Have output transforms always return something specific (which >> would >> >>> be the same across transforms by convention), that is in the form of a >> >>> PCollection, so operations can occur after it. >> >>> >> >>> 2. Make either PDone or some new type that can act as a PCollection >> so we >> >>> can run applies afterward. >> >>> >> >>> 3. Make output transforms provide the facility for a callback function >> >>> which runs after the transform is complete. >> >>> >> >>> I went through these gymnastics recently when I was trying to build >> >>> something that would move indices after writing to Algolia, and the >> solution >> >>> was to co-opt code from the old Sink class that used to exist in >> Beam. The >> >>> problem is that particular method requires the output transform in >> question >> >>> to return a PCollection, even if it is trivial or doesn’t make sense >> to >> >>> return one. This seems like a bad solution, but unfortunately there >> isn’t a >> >>> notion of a transform that has no explicit output that needs to have >> >>> operations occur after it. >> >>> >> >>> The three potential solutions above address this issue, but I would >> like >> >>> to hear on which would be preferable (or perhaps a different proposal >> >>> altogether?). Perhaps we could also start up a ticket on this, since >> it >> >>> seems like a worthwhile feature addition. I would find it really >> useful, for >> >>> one. >> >>> >> >>> Chet >> >>> >> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote: >> >>> >> >>> Instead of a callback fn, its most useful if a PCollection is returned >> >>> containing the result of the sink so that any arbitrary additional >> functions >> >>> can be applied. >> >>> >> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <j...@nanthrax.net >> > >> >>> wrote: >> >>>> >> >>>> Agree, I would prefer to do the callback in the IO more than in the >> >>>> main. >> >>>> >> >>>> Regards >> >>>> JB >> >>>> >> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote: >> >>>>> >> >>>>> I do something almost exactly like this, but with BigtableIO >> instead. >> >>>>> I have a pull request open here [1] (which reminds me I need to >> finish this >> >>>>> up...). It would really be nice for most IOs to support something >> like >> >>>>> this. >> >>>>> >> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the output >> from >> >>>>> the BigtableIO, and then feed that into your function which will >> run when >> >>>>> all writes finish. >> >>>>> >> >>>>> You probably want to avoid doing something in the main method >> because >> >>>>> there's no guarantee it'll actually run (maybe the driver will die, >> get >> >>>>> killed, machine will explode, etc). >> >>>>> >> >>>>> [1] https://github.com/apache/beam/pull/3997 >> >>>>> >> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com >> >>>>> <mailto:nerdyn...@gmail.com>> wrote: >> >>>>> >> >>>>> Assuming you're in Java. You could just follow on in your Main >> >>>>> method. >> >>>>> Checking the state of the Result. >> >>>>> >> >>>>> Example: >> >>>>> PipelineResult result = pipeline.run(); >> >>>>> try { >> >>>>> result.waitUntilFinish(); >> >>>>> if(result.getState() == PipelineResult.State.DONE) { >> >>>>> //DO ES work >> >>>>> } >> >>>>> } catch(Exception e) { >> >>>>> result.cancel(); >> >>>>> throw e; >> >>>>> } >> >>>>> >> >>>>> Otherwise you could also use Oozie to construct a work flow. >> >>>>> >> >>>>> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré >> >>>>> <j...@nanthrax.net >> >>>>> <mailto:j...@nanthrax.net>> wrote: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> yes, we had a similar question some days ago. >> >>>>> >> >>>>> We can imagine to have a user callback fn fired when the >> sink >> >>>>> batch is >> >>>>> complete. >> >>>>> >> >>>>> Let me think about that. >> >>>>> >> >>>>> Regards >> >>>>> JB >> >>>>> >> >>>>> On 12/01/2017 09:04 AM, Philip Chan wrote: >> >>>>> >> >>>>> Hey JB, >> >>>>> >> >>>>> Thanks for getting back so quickly. >> >>>>> I suppose in that case I would need a way of monitoring >> >>>>> when the ES >> >>>>> transform completes successfully before I can proceed >> with >> >>>>> doing the >> >>>>> swap. >> >>>>> The problem with this is that I can't think of a good >> way >> >>>>> to >> >>>>> determine that termination state short of polling the >> new >> >>>>> index to >> >>>>> check the document count compared to the size of input >> >>>>> PCollection. >> >>>>> That, or maybe I'd need to use an external system like >> you >> >>>>> mentioned >> >>>>> to poll on the state of the pipeline (I'm using Google >> >>>>> Dataflow, so >> >>>>> maybe there's a way to do this with some API). >> >>>>> But I would have thought that there would be an easy >> way of >> >>>>> simply >> >>>>> saying "do not process this transform until this other >> >>>>> transform >> >>>>> completes". >> >>>>> Is there no established way of "signaling" between >> >>>>> pipelines when >> >>>>> some pipeline completes, or have some way of declaring a >> >>>>> dependency >> >>>>> of 1 transform on another transform? >> >>>>> >> >>>>> Thanks again, >> >>>>> Philip >> >>>>> >> >>>>> On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré >> >>>>> <j...@nanthrax.net <mailto:j...@nanthrax.net> >> >>>>> <mailto:j...@nanthrax.net >> >>>>> >> >>>>> <mailto:j...@nanthrax.net>>> wrote: >> >>>>> >> >>>>> Hi Philip, >> >>>>> >> >>>>> You won't be able to do (3) in the same pipeline as >> >>>>> the >> >>>>> Elasticsearch Sink >> >>>>> PTransform ends the pipeline with PDone. >> >>>>> >> >>>>> So, (3) has to be done in another pipeline (using a >> >>>>> DoFn) or in >> >>>>> another >> >>>>> "system" (like Camel for instance). I would do a >> check >> >>>>> of the >> >>>>> data in the >> >>>>> index and then trigger the swap there. >> >>>>> >> >>>>> Regards >> >>>>> JB >> >>>>> >> >>>>> On 12/01/2017 08:41 AM, Philip Chan wrote: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> I'm pretty new to Beam, and I've been trying to >> >>>>> use the >> >>>>> ElasticSearchIO >> >>>>> sink to write docs into ES. >> >>>>> With this, I want to be able to >> >>>>> 1. ingest and transform rows from DB (done) >> >>>>> 2. write JSON docs/strings into a new ES index >> >>>>> (done) >> >>>>> 3. After (2) is complete and all documents are >> >>>>> written into >> >>>>> a new index, >> >>>>> trigger an atomic index swap under an alias to >> >>>>> replace the >> >>>>> current >> >>>>> aliased index with the new index generated in >> step >> >>>>> 2. This >> >>>>> is basically >> >>>>> a single POST request to the ES cluster. >> >>>>> >> >>>>> The problem I'm facing is that I don't seem to >> be >> >>>>> able to >> >>>>> find a way to >> >>>>> have a way for (3) to happen after step (2) is >> >>>>> complete. >> >>>>> >> >>>>> The ElasticSearchIO.Write transform returns a >> >>>>> PDone, and >> >>>>> I'm not sure >> >>>>> how to proceed from there because it doesn't >> seem >> >>>>> to let me >> >>>>> do another >> >>>>> apply on it to "define" a dependency. >> >>>>> >> >>>>> >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> >>>>> >> >>>>> < >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> > >> >>>>> >> >>>>> < >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> >>>>> >> >>>>> < >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> >> >> >>>>> >> >>>>> < >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> >>>>> >> >>>>> < >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> > >> >>>>> >> >>>>> < >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> >>>>> >> >>>>> < >> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html >> >>> >> >>>>> >> >>>>> Is there a recommended way to construct >> pipelines >> >>>>> workflows >> >>>>> like this? >> >>>>> >> >>>>> Thanks in advance, >> >>>>> Philip >> >>>>> >> >>>>> >> >>>>> -- Jean-Baptiste Onofré >> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >> >>>>> <mailto:jbono...@apache.org <mailto:jbono...@apache.org >> >> >> >>>>> http://blog.nanthrax.net >> >>>>> Talend - http://www.talend.com >> >>>>> >> >>>>> >> >>>>> >> >>>>> -- Jean-Baptiste Onofré >> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >> >>>>> http://blog.nanthrax.net >> >>>>> Talend - http://www.talend.com >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> -- Nick Verbeck - NerdyNick >> >>>>> ---------------------------------------------------- >> >>>>> NerdyNick.com <http://NerdyNick.com> >> >>>>> TrailsOffroad.com <http://TrailsOffroad.com> >> >>>>> NoKnownBoundaries.com <http://NoKnownBoundaries.com> >> >>>>> >> >>>>> >> >>>>> >> >>>> >> >>>> -- >> >>>> Jean-Baptiste Onofré >> >>>> jbono...@apache.org >> >>>> http://blog.nanthrax.net >> >>>> Talend - http://www.talend.com >> >>> >> >>> >> >>> >> >> >> > >> >