Makes sense. Thanks Eugene.

On Mon, Aug 8, 2016, 21:28 Eugene Kirpichov <kirpic...@google.com.invalid>
wrote:

> Hi Amit,
>
> Glad you liked the proposal! Yes, adding the power to sequence reading of
> sources against other things happening in the pipeline is one of the
> biggest benefits.
>
> I think this proposal is fully compatible with having a runner override for
> TextIO: either way TextIO.Read() produces a composite transform, and the
> runner can override the implementation of this transform regardless of
> whether it uses Read.from() or SDF under the hood.
>
> Perhaps you're concerned about how a "dynamic" TextIO transform would look
> like (read from a collection of filepatterns) that the Spark runner could
> efficiently support?
> I'm not familiar enough with capabilities of the Spark runner: best case,
> the runner would have an efficient override for this transform too; worst
> case, we'd have two overloads - one for a fixed filepattern and one for a
> PCollection of filepatterns, only one of these efficiently overridden by
> the Spark runner. Does this make sense?
>
> Thanks.
>
> On Mon, Aug 8, 2016 at 9:44 AM Amit Sela <amitsel...@gmail.com> wrote:
>
> > Hi Eugene,
> >
> > I really like the proposal, especially the part of embedding a non-Beam
> job
> > and export jobs prior to pipeline execution - up until now, such work
> would
> > have been managed by some 3rd party orchestrator that monitors the end of
> > the prepending job, and then executes the pipeline. Having this control
> at
> > the *SDF* sounds really great.
> >
> > I wonder how do you see "high-level/direct" translations incorporated
> here
> > - the Spark runner for example, will prefer to directly translate TextIO
> > into it's own API for reading with a file pattern assuming that Spark's
> > implementation is optimal (for Spark).
> >
> > Thanks,
> > Amit
> >
> >
> > On Mon, Aug 8, 2016 at 12:33 PM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Jip, thanks, that answers it.
> > >
> > > On Fri, 5 Aug 2016 at 19:51 Eugene Kirpichov
> > <kirpic...@google.com.invalid
> > > >
> > > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > AFAIK, the effect of .requiresDeduping() is that the runner inserts a
> > > > GBK/dedup transform on top of the read. This seems entirely
> compatible
> > > with
> > > > SDF, except it will be decoupled from the SDF itself: if an SDF
> > produces
> > > > output that potentially contains duplicates, and there's no easy way
> to
> > > fix
> > > > it in the SDF itself, and you (developer of the connector) would like
> > to
> > > > eliminate them, you can explicitly compose the SDF with a canned
> > deduping
> > > > transform. Does this address your question?
> > > >
> > > > Thanks!
> > > >
> > > > On Fri, Aug 5, 2016 at 7:14 AM Aljoscha Krettek <aljos...@apache.org
> >
> > > > wrote:
> > > >
> > > > > I really like the proposal, especially how it unifies at lot of
> > things.
> > > > >
> > > > > One question: How would this work with sources that (right now)
> > return
> > > > true
> > > > > from UnboundedSource.requiresDeduping(). As I understand it the
> code
> > > that
> > > > > executes such sources has to do bookkeeping to ensure that we don't
> > get
> > > > > duplicate values. Would we add such a feature for the output of
> DoFns
> > > or
> > > > > would we work towards removing the deduping functionality from Beam
> > and
> > > > > push it into the source implementations?
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <j...@nanthrax.net>
> > > > wrote:
> > > > >
> > > > > > By the way I like the use cases you are introducing: we discussed
> > > about
> > > > > > similar use cases with Dan.
> > > > > >
> > > > > > Just wonder about the existing IO.
> > > > > >
> > > > > > Regards
> > > > > > JB
> > > > > >
> > > > > >
> > > > > > On August 4, 2016 7:46:14 PM Eugene Kirpichov
> > > > > > <kirpic...@google.com.INVALID> wrote:
> > > > > >
> > > > > > > Hello Beam community,
> > > > > > >
> > > > > > > We (myself, Daniel Mills and Robert Bradshaw) would like to
> > propose
> > > > > > > "Splittable DoFn" - a major generalization of DoFn, which
> allows
> > > > > > processing
> > > > > > > of a single element to be non-monolithic, i.e. checkpointable
> and
> > > > > > > parallelizable, as well as doing an unbounded amount of work
> per
> > > > > element.
> > > > > > >
> > > > > > > This allows effectively replacing the current
> > > Bounded/UnboundedSource
> > > > > > APIs
> > > > > > > with DoFn's that are much easier to code, more scalable and
> > > > composable
> > > > > > with
> > > > > > > the rest of the Beam programming model, and enables many use
> > cases
> > > > that
> > > > > > > were previously difficult or impossible, as well as some
> > > non-obvious
> > > > > new
> > > > > > > use cases.
> > > > > > >
> > > > > > > This proposal has been mentioned before in JIRA [BEAM-65] and
> > some
> > > > Beam
> > > > > > > meetings, and now the whole thing is written up in a document:
> > > > > > >
> > > > > > >         https://s.apache.org/splittable-do-fn
> > > > > > >
> > > > > > > Here are some things that become possible with Splittable DoFn:
> > > > > > > - Efficiently read a filepattern matching millions of files
> > > > > > > - Read a collection of files that are produced by an earlier
> step
> > > in
> > > > > the
> > > > > > > pipeline (e.g. easily implement a connector to a storage system
> > > that
> > > > > can
> > > > > > > export itself to files)
> > > > > > > - Implement a Kafka reader by composing a "list partitions"
> DoFn
> > > > with a
> > > > > > > DoFn that simply polls a consumer and outputs new records in a
> > > > while()
> > > > > > loop
> > > > > > > - Implement a log tailer by composing a DoFn that incrementally
> > > > returns
> > > > > > new
> > > > > > > files in a directory and a DoFn that tails a file
> > > > > > > - Implement a parallel "count friends in common" algorithm
> > (matrix
> > > > > > > squaring) with good work balancing
> > > > > > >
> > > > > > > Here is the meaningful part of a hypothetical Kafka reader
> > written
> > > > > > against
> > > > > > > this API:
> > > > > > >
> > > > > > >     ProcessContinuation processElement(
> > > > > > >             ProcessContext context, OffsetRangeTracker
> tracker) {
> > > > > > >       try (KafkaConsumer<String, String> consumer =
> > > > > > >                 Kafka.subscribe(context.element().topic,
> > > > > > >                                 context.element().partition)) {
> > > > > > >         consumer.seek(tracker.start());
> > > > > > >         while (true) {
> > > > > > >           ConsumerRecords<String, String> records =
> > > > > consumer.poll(100ms);
> > > > > > >           if (records == null) return done();
> > > > > > >           for (ConsumerRecord<String, String> record :
> records) {
> > > > > > >             if (!tracker.tryClaim(record.offset())) {
> > > > > > >               return
> > > > > > resume().withFutureOutputWatermark(record.timestamp());
> > > > > > >             }
> > > > > > >             context.output(record);
> > > > > > >           }
> > > > > > >         }
> > > > > > >       }
> > > > > > >     }
> > > > > > >
> > > > > > > The document describes in detail the motivations behind this
> > > feature,
> > > > > the
> > > > > > > basic idea and API, open questions, and outlines an incremental
> > > > > delivery
> > > > > > > plan.
> > > > > > >
> > > > > > > The proposed API builds on the reflection-based new DoFn
> > > [new-do-fn]
> > > > > and
> > > > > > is
> > > > > > > loosely related to "State and Timers for DoFn" [beam-state].
> > > > > > >
> > > > > > > Please take a look and comment!
> > > > > > >
> > > > > > > Thanks.
> > > > > > >
> > > > > > > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > > > > > > [new-do-fn] https://s.apache.org/a-new-do-fn
> > > > > > > [beam-state] https://s.apache.org/beam-state
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to