Hi Amit,

Glad you liked the proposal! Yes, adding the power to sequence reading of
sources against other things happening in the pipeline is one of the
biggest benefits.

I think this proposal is fully compatible with having a runner override for
TextIO: either way TextIO.Read() produces a composite transform, and the
runner can override the implementation of this transform regardless of
whether it uses Read.from() or SDF under the hood.

Perhaps you're concerned about how a "dynamic" TextIO transform would look
like (read from a collection of filepatterns) that the Spark runner could
efficiently support?
I'm not familiar enough with capabilities of the Spark runner: best case,
the runner would have an efficient override for this transform too; worst
case, we'd have two overloads - one for a fixed filepattern and one for a
PCollection of filepatterns, only one of these efficiently overridden by
the Spark runner. Does this make sense?

Thanks.

On Mon, Aug 8, 2016 at 9:44 AM Amit Sela <amitsel...@gmail.com> wrote:

> Hi Eugene,
>
> I really like the proposal, especially the part of embedding a non-Beam job
> and export jobs prior to pipeline execution - up until now, such work would
> have been managed by some 3rd party orchestrator that monitors the end of
> the prepending job, and then executes the pipeline. Having this control at
> the *SDF* sounds really great.
>
> I wonder how do you see "high-level/direct" translations incorporated here
> - the Spark runner for example, will prefer to directly translate TextIO
> into it's own API for reading with a file pattern assuming that Spark's
> implementation is optimal (for Spark).
>
> Thanks,
> Amit
>
>
> On Mon, Aug 8, 2016 at 12:33 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Jip, thanks, that answers it.
> >
> > On Fri, 5 Aug 2016 at 19:51 Eugene Kirpichov
> <kirpic...@google.com.invalid
> > >
> > wrote:
> >
> > > Hi Aljoscha,
> > >
> > > AFAIK, the effect of .requiresDeduping() is that the runner inserts a
> > > GBK/dedup transform on top of the read. This seems entirely compatible
> > with
> > > SDF, except it will be decoupled from the SDF itself: if an SDF
> produces
> > > output that potentially contains duplicates, and there's no easy way to
> > fix
> > > it in the SDF itself, and you (developer of the connector) would like
> to
> > > eliminate them, you can explicitly compose the SDF with a canned
> deduping
> > > transform. Does this address your question?
> > >
> > > Thanks!
> > >
> > > On Fri, Aug 5, 2016 at 7:14 AM Aljoscha Krettek <aljos...@apache.org>
> > > wrote:
> > >
> > > > I really like the proposal, especially how it unifies at lot of
> things.
> > > >
> > > > One question: How would this work with sources that (right now)
> return
> > > true
> > > > from UnboundedSource.requiresDeduping(). As I understand it the code
> > that
> > > > executes such sources has to do bookkeeping to ensure that we don't
> get
> > > > duplicate values. Would we add such a feature for the output of DoFns
> > or
> > > > would we work towards removing the deduping functionality from Beam
> and
> > > > push it into the source implementations?
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <j...@nanthrax.net>
> > > wrote:
> > > >
> > > > > By the way I like the use cases you are introducing: we discussed
> > about
> > > > > similar use cases with Dan.
> > > > >
> > > > > Just wonder about the existing IO.
> > > > >
> > > > > Regards
> > > > > JB
> > > > >
> > > > >
> > > > > On August 4, 2016 7:46:14 PM Eugene Kirpichov
> > > > > <kirpic...@google.com.INVALID> wrote:
> > > > >
> > > > > > Hello Beam community,
> > > > > >
> > > > > > We (myself, Daniel Mills and Robert Bradshaw) would like to
> propose
> > > > > > "Splittable DoFn" - a major generalization of DoFn, which allows
> > > > > processing
> > > > > > of a single element to be non-monolithic, i.e. checkpointable and
> > > > > > parallelizable, as well as doing an unbounded amount of work per
> > > > element.
> > > > > >
> > > > > > This allows effectively replacing the current
> > Bounded/UnboundedSource
> > > > > APIs
> > > > > > with DoFn's that are much easier to code, more scalable and
> > > composable
> > > > > with
> > > > > > the rest of the Beam programming model, and enables many use
> cases
> > > that
> > > > > > were previously difficult or impossible, as well as some
> > non-obvious
> > > > new
> > > > > > use cases.
> > > > > >
> > > > > > This proposal has been mentioned before in JIRA [BEAM-65] and
> some
> > > Beam
> > > > > > meetings, and now the whole thing is written up in a document:
> > > > > >
> > > > > >         https://s.apache.org/splittable-do-fn
> > > > > >
> > > > > > Here are some things that become possible with Splittable DoFn:
> > > > > > - Efficiently read a filepattern matching millions of files
> > > > > > - Read a collection of files that are produced by an earlier step
> > in
> > > > the
> > > > > > pipeline (e.g. easily implement a connector to a storage system
> > that
> > > > can
> > > > > > export itself to files)
> > > > > > - Implement a Kafka reader by composing a "list partitions" DoFn
> > > with a
> > > > > > DoFn that simply polls a consumer and outputs new records in a
> > > while()
> > > > > loop
> > > > > > - Implement a log tailer by composing a DoFn that incrementally
> > > returns
> > > > > new
> > > > > > files in a directory and a DoFn that tails a file
> > > > > > - Implement a parallel "count friends in common" algorithm
> (matrix
> > > > > > squaring) with good work balancing
> > > > > >
> > > > > > Here is the meaningful part of a hypothetical Kafka reader
> written
> > > > > against
> > > > > > this API:
> > > > > >
> > > > > >     ProcessContinuation processElement(
> > > > > >             ProcessContext context, OffsetRangeTracker tracker) {
> > > > > >       try (KafkaConsumer<String, String> consumer =
> > > > > >                 Kafka.subscribe(context.element().topic,
> > > > > >                                 context.element().partition)) {
> > > > > >         consumer.seek(tracker.start());
> > > > > >         while (true) {
> > > > > >           ConsumerRecords<String, String> records =
> > > > consumer.poll(100ms);
> > > > > >           if (records == null) return done();
> > > > > >           for (ConsumerRecord<String, String> record : records) {
> > > > > >             if (!tracker.tryClaim(record.offset())) {
> > > > > >               return
> > > > > resume().withFutureOutputWatermark(record.timestamp());
> > > > > >             }
> > > > > >             context.output(record);
> > > > > >           }
> > > > > >         }
> > > > > >       }
> > > > > >     }
> > > > > >
> > > > > > The document describes in detail the motivations behind this
> > feature,
> > > > the
> > > > > > basic idea and API, open questions, and outlines an incremental
> > > > delivery
> > > > > > plan.
> > > > > >
> > > > > > The proposed API builds on the reflection-based new DoFn
> > [new-do-fn]
> > > > and
> > > > > is
> > > > > > loosely related to "State and Timers for DoFn" [beam-state].
> > > > > >
> > > > > > Please take a look and comment!
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > > > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > > > > > [new-do-fn] https://s.apache.org/a-new-do-fn
> > > > > > [beam-state] https://s.apache.org/beam-state
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to