Thanks Raghu!

WIP PR for UnboundedSource support:
https://github.com/amitsela/incubator-beam/commits/BEAM-658-WIP
JIRA ticket: https://issues.apache.org/jira/browse/BEAM-658
Design document:
https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing

The PR still needs some polishing, and better testing, I'll be working on
it this week.

Thanks,
Amit

On Sun, Oct 9, 2016 at 2:54 AM Raghu Angadi <[email protected]>
wrote:

> Thanks for the updates Amit.
>
> One follow up: when does the Spark runner ask for CheckpointMark from the
> reader (at the end of a micro batch?)?
>
I've wrapped the entire Read within Spark's stateful operator
(mapWithState), so at the end of each read the CheckpointMark is saved.

>
> Spark runner seems different enough from Dataflow and Flink that it is a
> good show case for Beam runner API and implementations. It is not surprise
> that you are seeing some issues, I think we can work these out.
>
> I am interested in updating KafkaIO to support change in Kafka partitions
> at run time. Looks like it would work for Spark too (as deriredNumSplits
> stays constant).
>
> You might not need to use 'BoundedReadFromUnboundedSource', but that could
> be a future improvement (e.g. Dataflow runner has full control of life
> cycle of a source/reader.
>
I'm using a slightly different version of `BoundedReadFromUnboundedSource`,
and I think I have no choice because I have to bound the read somehow or
the micro-batch will grow indefinitely.

>
> I will take a look at the Spark runner.. do you have a branch that you are
> working with?
>
> Raghu.
>
>
> On Sat, Oct 8, 2016 at 11:33 AM, Amit Sela <[email protected]> wrote:
>
> > Some answers inline.
> > @Raghu I'll review the PR tomorrow.
> >
> > Thanks,
> > Amit
> >
> > On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <[email protected]>
> > wrote:
> >
> > > On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <[email protected]>
> wrote:
> > >
> > > >    3. Support reading of Kafka partitions that were added to topic/s
> > > while
> > > >    a Pipeline reads from them - BEAM-727
> > > >    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
> > > >
> > >
> > > I think this is doable (assuming some caveats about
> > generateInitalSplits()
> > > contract). How important is this feature?
> > >
> > Well, I don't know how to estimate the importance but I can say that
> Kafka
> > allows this, and Spark Streaming supports this - the way Apache Spark
> > currently reads from Kafka is by polling start-end offsets before
> > scheduling every micro-batch job, so if suddenly a new partition appears
> > it's simply added to the list of partitions to read from and the driver
> > program tracks the offsets.
> > I assume Beam would like to support added partitions eventually and from
> > what I understand Dataflow will have to restart the pipeline (Flink too
> ?),
> > but it doesn't change the fact that we'll have to assign in a way that
> > keeps previously assigned partitions the same.
> >
> > >
> > > Some basic questions about Spark runner :
> > >
> > >    - do number of partitions stay same during life of a long running
> Beam
> > >    streaming job?
> > >
> > They should, yes, otherwise we'll have a consistency issue with partition
> > assignment.
> >
> > >    - Will generateIntialSplits() be called more than once during the
> life
> > >    of a job?
> > >
> > This is an important point! yes, every "batch interval" Spark will
> execute
> > a new "micro" job and (the runner) will treat UnboundedSource as a sort
> of
> > BoundedReadFromUnboundedSource with bounds on time (proportional to the
> > interval time) and possibly records, so generateIntialSplits() will be
> > re-evaluated each time. That's way the runner should keep the original
> > (first) splitting, by supplying UnboundedSource with the same
> > *desiredNumSplits
> > *as negotiated at first (against the Spark backend - YARN/Mesos etc.).
> >
> > >    - When a job is restarted, is generateInitialSplits() invoked again?
> > >       - if yes, do you expect  'desiredNumSplits' for
> > >       generateInitialSplits() to stay same as previous run?
> > >
> > As mentioned above, the execution itself is a constant
> > "restart/resubmission" of jobs, and upon a "real" restart (due to failure
> > recovery of user-initiated stop-start) the runner is supposed do preserve
> > the original *desiredNumSplits, *and the UnboundedSource is expected to
> > consistently re-assign the splits.
> >
> > >       - if no, are the readers instantiated from previous runs?
> > >
> >
>

Reply via email to