It makes sense to consider how this maps onto existing kinds of sinks.

E.g.:
- Something that just makes an RPC per record, e.g. MqttIO.write(): that
will emit 1 result per bundle (either a bogus value or number of records
written) that will be Combine'd into 1 result per pane of input. A user can
sequence against this and be notified when some intermediate amount of data
has been written for a window, or (via .isFinal()) when all of it has been
written.
- Something that e.g. initiates an import job, such as BigQueryIO.write(),
or an ElasticsearchIO write with a follow-up atomic index swap: should emit
1 result per import job, e.g. containing information about the job (e.g.
its id and statistics). Role of panes is the same.
- Something like above but that supports dynamic destinations: like in
WriteFiles, result will be PCollection<KV<DestinationT, ResultT>> where
ResultT may be something like a list of files that were written for this
pane of this destination.

On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> I agree that the proper API for enabling the use case "do something after
> the data has been written" is to return a PCollection of objects where each
> object represents the result of writing some identifiable subset of the
> data. Then one can apply a ParDo to this PCollection, in order to "do
> something after this subset has been written".
>
> The challenging part here is *identifying* the subset of the data that's
> been written, in a way consistent with Beam's unified batch/streaming
> model, where saying "all data has been written" is not an option because
> more data can arrive.
>
> The next choice is "a window of input has been written", but then again,
> late data can arrive into a window as well.
>
> Next choice after that is "a pane of input has been written", but per
> https://s.apache.org/beam-sink-triggers the term "pane of input" is moot:
> triggering and panes should be something private to the sink, and the same
> input can trigger different sinks differently. The hypothetical different
> accumulation modes make this trickier still. I'm not sure whether we intend
> to also challenge the idea that windowing is inherent to the collection, or
> whether it too should be specified on a transform that processes the
> collection. I think for the sake of this discussion we can assume that it's
> inherent, and assume the mental model that the elements in different
> windows of a PCollection are processed independently - "as if" there were
> multiple pipelines processing each window.
>
> Overall, embracing the full picture, we end up with something like this:
> - The input PCollection is a composition of windows.
> - If the windowing strategy is non-merging (e.g. fixed or sliding
> windows), the below applies to the entire contents of the PCollection. If
> it's merging (e.g. session windows), then it applies per-key, and the input
> should be (perhaps implicitly) keyed in a way that the sink understands -
> for example, the grouping by destination in DynamicDestinations in file and
> bigquery writes.
> - Each window's contents is a "changelog" - stream of elements and
> retractions.
> - A "sink" processes each window of the collection, deciding how to handle
> elements and retractions (and whether to support retractions at all) in a
> sink-specific way, and deciding *when* to perform the side effects for a
> portion of the changelog (a "pane") based on the sink's triggering strategy.
> - If the side effect itself is parallelized, then there'll be multiple
> results for the pane - e.g. one per bundle.
> - Each (sink-chosen) pane produces a set of results, e.g. a list of
> filenames that have been written, or simply a number of records that was
> written, or a bogus void value etc. The result will implicitly include the
> window of the input it's associated with. It will also implicitly include
> pane information - index of the pane in this window, and whether this is
> the first or last pane.
> - The partitioning into bundles is an implementation detail and not very
> useful, so before presenting the pane write results to the user, the sink
> will probably want to Combine the bundle results so that there ends up
> being 1 value for each pane that was written. Once again note that panes
> may be associated with windows of the input as a whole, but if the input is
> keyed (like with DynamicDestinations) they'll be associated with per-key
> subsets of windows of the input.
> - This combining requires an extra, well, combining operation, so it
> should be optional.
> - The user will end up getting either a PCollection<ResultT> or a
> PCollection<KV<KeyT, ResultT>>, for sink-specific KeyT and ResultT, where
> the elements of this collection will implicitly have window and pane
> information, available via the implicit BoundedWindow and PaneInfo.
> - Until "sink triggering" is implemented, we'll have to embrace the fact
> that trigger strategy is set on the input. But in that case the user will
> have to accept that the PaneInfo of ResultT's is not necessarily directly
> related to panes of the input - the sink is allowed to do internal
> aggregation as an implementation detail, which may modify the triggering
> strategy. Basically the user will still get sink-assigned panes.
> - In most cases, one may imagine that the user is interested in being
> notified of "no more data associated with this window will be written", so
> the user will ignore all ResultT's except those where the pane is marked
> final. If a user is interested in being notified of intermediate write
> results - they'll have to embrace the fact that they cannot identify the
> precise subset of input associated with the intermediate result.
>
> I think the really key points of the above are:
> - Sinks should support windowed input. Sinks should write different
> windows of input independently. If the sink can write multi-destination
> input, the destination should function as a grouping key, and in that case
> merging windowing should be allowed.
> - Producing a PCollection of write results should be optional.
> - When asked to produce results, sinks produce a PCollection of results
> that may be keyed or unkeyed (per above), and are placed in the window of
> the input that was written, and have a PaneInfo assigned by the sink, of
> which probably the only part useful to the user is whether it's .isFinal().
>
> Does this sound reasonable?
>
> On Mon, Dec 4, 2017 at 11:50 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> +1
>>
>> At the very least an empty PCollection<?> could be produced with no
>> promises about its contents but the ability to be followed (e.g. as a
>> side input), which is forward compatible with whatever actual metadata
>> one may decide to produce in the future.
>>
>> On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles <k...@google.com> wrote:
>> > +dev@
>> >
>> > I am in complete agreement with Luke. Data dependencies are easy to
>> > understand and a good way for an IO to communicate and establish causal
>> > dependencies. Converting an IO from PDone to real output may spur
>> further
>> > useful thoughts based on the design decisions about what sort of output
>> is
>> > most useful.
>> >
>> > Kenn
>> >
>> > On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik <lc...@google.com> wrote:
>> >>
>> >> I think all sinks actually do have valuable information to output which
>> >> can be used after a write (file names, transaction/commit/row ids,
>> table
>> >> names, ...). In addition to this metadata, having a PCollection of all
>> >> successful writes and all failed writes is useful for users so they can
>> >> chain an action which depends on what was or wasn't successfully
>> written.
>> >> Users have requested adding retry/failure handling policies to sinks
>> so that
>> >> failed writes don't jam up the pipeline.
>> >>
>> >> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich <
>> chet.aldr...@postmates.com>
>> >> wrote:
>> >>>
>> >>> So I agree generally with the idea that returning a PCollection makes
>> all
>> >>> of this easier so that arbitrary additional functions can be added,
>> what
>> >>> exactly would write functions be returning in a PCollection that
>> would make
>> >>> sense? The whole idea is that we’ve written to an external source and
>> now
>> >>> the collection itself is no longer needed.
>> >>>
>> >>> Currently, that’s represented with a PDone, but currently that doesn’t
>> >>> allow any work to occur after it. I see a couple possible ways of
>> handling
>> >>> this given this conversation, and am curious which solution sounds
>> like the
>> >>> best way to deal with the problem:
>> >>>
>> >>> 1. Have output transforms always return something specific (which
>> would
>> >>> be the same across transforms by convention), that is in the form of a
>> >>> PCollection, so operations can occur after it.
>> >>>
>> >>> 2. Make either PDone or some new type that can act as a PCollection
>> so we
>> >>> can run applies afterward.
>> >>>
>> >>> 3. Make output transforms provide the facility for a callback function
>> >>> which runs after the transform is complete.
>> >>>
>> >>> I went through these gymnastics recently when I was trying to build
>> >>> something that would move indices after writing to Algolia, and the
>> solution
>> >>> was to co-opt code from the old Sink class that used to exist in
>> Beam. The
>> >>> problem is that particular method requires the output transform in
>> question
>> >>> to return a PCollection, even if it is trivial or doesn’t make sense
>> to
>> >>> return one. This seems like a bad solution, but unfortunately there
>> isn’t a
>> >>> notion of a transform that has no explicit output that needs to have
>> >>> operations occur after it.
>> >>>
>> >>> The three potential solutions above address this issue, but I would
>> like
>> >>> to hear on which would be preferable (or perhaps a different proposal
>> >>> altogether?). Perhaps we could also start up a ticket on this, since
>> it
>> >>> seems like a worthwhile feature addition. I would find it really
>> useful, for
>> >>> one.
>> >>>
>> >>> Chet
>> >>>
>> >>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik <lc...@google.com> wrote:
>> >>>
>> >>> Instead of a callback fn, its most useful if a PCollection is returned
>> >>> containing the result of the sink so that any arbitrary additional
>> functions
>> >>> can be applied.
>> >>>
>> >>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré <j...@nanthrax.net
>> >
>> >>> wrote:
>> >>>>
>> >>>> Agree, I would prefer to do the callback in the IO more than in the
>> >>>> main.
>> >>>>
>> >>>> Regards
>> >>>> JB
>> >>>>
>> >>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>> >>>>>
>> >>>>> I do something almost exactly like this, but with BigtableIO
>> instead.
>> >>>>> I have a pull request open here [1] (which reminds me I need to
>> finish this
>> >>>>> up...).  It would really be nice for most IOs to support something
>> like
>> >>>>> this.
>> >>>>>
>> >>>>> Essentially you do a GroupByKey (or some CombineFn) on the output
>> from
>> >>>>> the BigtableIO, and then feed that into your function which will
>> run when
>> >>>>> all writes finish.
>> >>>>>
>> >>>>> You probably want to avoid doing something in the main method
>> because
>> >>>>> there's no guarantee it'll actually run (maybe the driver will die,
>> get
>> >>>>> killed, machine will explode, etc).
>> >>>>>
>> >>>>> [1] https://github.com/apache/beam/pull/3997
>> >>>>>
>> >>>>> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick <nerdyn...@gmail.com
>> >>>>> <mailto:nerdyn...@gmail.com>> wrote:
>> >>>>>
>> >>>>>     Assuming you're in Java. You could just follow on in your Main
>> >>>>> method.
>> >>>>>     Checking the state of the Result.
>> >>>>>
>> >>>>>     Example:
>> >>>>>     PipelineResult result = pipeline.run();
>> >>>>>     try {
>> >>>>>     result.waitUntilFinish();
>> >>>>>     if(result.getState() == PipelineResult.State.DONE) {
>> >>>>>     //DO ES work
>> >>>>>     }
>> >>>>>     } catch(Exception e) {
>> >>>>>     result.cancel();
>> >>>>>     throw e;
>> >>>>>     }
>> >>>>>
>> >>>>>     Otherwise you could also use Oozie to construct a work flow.
>> >>>>>
>> >>>>>     On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré
>> >>>>> <j...@nanthrax.net
>> >>>>>     <mailto:j...@nanthrax.net>> wrote:
>> >>>>>
>> >>>>>         Hi,
>> >>>>>
>> >>>>>         yes, we had a similar question some days ago.
>> >>>>>
>> >>>>>         We can imagine to have a user callback fn fired when the
>> sink
>> >>>>> batch is
>> >>>>>         complete.
>> >>>>>
>> >>>>>         Let me think about that.
>> >>>>>
>> >>>>>         Regards
>> >>>>>         JB
>> >>>>>
>> >>>>>         On 12/01/2017 09:04 AM, Philip Chan wrote:
>> >>>>>
>> >>>>>             Hey JB,
>> >>>>>
>> >>>>>             Thanks for getting back so quickly.
>> >>>>>             I suppose in that case I would need a way of monitoring
>> >>>>> when the ES
>> >>>>>             transform completes successfully before I can proceed
>> with
>> >>>>> doing the
>> >>>>>             swap.
>> >>>>>             The problem with this is that I can't think of a good
>> way
>> >>>>> to
>> >>>>>             determine that termination state short of polling the
>> new
>> >>>>> index to
>> >>>>>             check the document count compared to the size of input
>> >>>>> PCollection.
>> >>>>>             That, or maybe I'd need to use an external system like
>> you
>> >>>>> mentioned
>> >>>>>             to poll on the state of the pipeline (I'm using Google
>> >>>>> Dataflow, so
>> >>>>>             maybe there's a way to do this with some API).
>> >>>>>             But I would have thought that there would be an easy
>> way of
>> >>>>> simply
>> >>>>>             saying "do not process this transform until this other
>> >>>>> transform
>> >>>>>             completes".
>> >>>>>             Is there no established way of "signaling" between
>> >>>>> pipelines when
>> >>>>>             some pipeline completes, or have some way of declaring a
>> >>>>> dependency
>> >>>>>             of 1 transform on another transform?
>> >>>>>
>> >>>>>             Thanks again,
>> >>>>>             Philip
>> >>>>>
>> >>>>>             On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré
>> >>>>>             <j...@nanthrax.net <mailto:j...@nanthrax.net>
>> >>>>> <mailto:j...@nanthrax.net
>> >>>>>
>> >>>>>             <mailto:j...@nanthrax.net>>> wrote:
>> >>>>>
>> >>>>>                  Hi Philip,
>> >>>>>
>> >>>>>                  You won't be able to do (3) in the same pipeline as
>> >>>>> the
>> >>>>>             Elasticsearch Sink
>> >>>>>                  PTransform ends the pipeline with PDone.
>> >>>>>
>> >>>>>                  So, (3) has to be done in another pipeline (using a
>> >>>>> DoFn) or in
>> >>>>>             another
>> >>>>>                  "system" (like Camel for instance). I would do a
>> check
>> >>>>> of the
>> >>>>>             data in the
>> >>>>>                  index and then trigger the swap there.
>> >>>>>
>> >>>>>                  Regards
>> >>>>>                  JB
>> >>>>>
>> >>>>>                  On 12/01/2017 08:41 AM, Philip Chan wrote:
>> >>>>>
>> >>>>>                      Hi,
>> >>>>>
>> >>>>>                      I'm pretty new to Beam, and I've been trying to
>> >>>>> use the
>> >>>>>             ElasticSearchIO
>> >>>>>                      sink to write docs into ES.
>> >>>>>                      With this, I want to be able to
>> >>>>>                      1. ingest and transform rows from DB (done)
>> >>>>>                      2. write JSON docs/strings into a new ES index
>> >>>>> (done)
>> >>>>>                      3. After (2) is complete and all documents are
>> >>>>> written into
>> >>>>>             a new index,
>> >>>>>                      trigger an atomic index swap under an alias to
>> >>>>> replace the
>> >>>>>             current
>> >>>>>                      aliased index with the new index generated in
>> step
>> >>>>> 2. This
>> >>>>>             is basically
>> >>>>>                      a single POST request to the ES cluster.
>> >>>>>
>> >>>>>                      The problem I'm facing is that I don't seem to
>> be
>> >>>>> able to
>> >>>>>             find a way to
>> >>>>>                      have a way for (3) to happen after step (2) is
>> >>>>> complete.
>> >>>>>
>> >>>>>                      The ElasticSearchIO.Write transform returns a
>> >>>>> PDone, and
>> >>>>>             I'm not sure
>> >>>>>                      how to proceed from there because it doesn't
>> seem
>> >>>>> to let me
>> >>>>>             do another
>> >>>>>                      apply on it to "define" a dependency.
>> >>>>>
>> >>>>>
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >>>>>
>> >>>>> <
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >
>> >>>>>
>> >>>>> <
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >>>>>
>> >>>>> <
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >>
>> >>>>>
>> >>>>> <
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >>>>>
>> >>>>> <
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >
>> >>>>>
>> >>>>> <
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >>>>>
>> >>>>> <
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>> >>>
>> >>>>>
>> >>>>>                      Is there a recommended way to construct
>> pipelines
>> >>>>> workflows
>> >>>>>             like this?
>> >>>>>
>> >>>>>                      Thanks in advance,
>> >>>>>                      Philip
>> >>>>>
>> >>>>>
>> >>>>>                  --     Jean-Baptiste Onofré
>> >>>>>             jbono...@apache.org <mailto:jbono...@apache.org>
>> >>>>>             <mailto:jbono...@apache.org <mailto:jbono...@apache.org
>> >>
>> >>>>>             http://blog.nanthrax.net
>> >>>>>                  Talend - http://www.talend.com
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>         --         Jean-Baptiste Onofré
>> >>>>>         jbono...@apache.org <mailto:jbono...@apache.org>
>> >>>>>         http://blog.nanthrax.net
>> >>>>>         Talend - http://www.talend.com
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>     --     Nick Verbeck - NerdyNick
>> >>>>>     ----------------------------------------------------
>> >>>>>     NerdyNick.com <http://NerdyNick.com>
>> >>>>>     TrailsOffroad.com <http://TrailsOffroad.com>
>> >>>>>     NoKnownBoundaries.com <http://NoKnownBoundaries.com>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>> --
>> >>>> Jean-Baptiste Onofré
>> >>>> jbono...@apache.org
>> >>>> http://blog.nanthrax.net
>> >>>> Talend - http://www.talend.com
>> >>>
>> >>>
>> >>>
>> >>
>> >
>>
>

Reply via email to