Some answers inline.
@Raghu I'll review the PR tomorrow.

Thanks,
Amit

On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <[email protected]>
wrote:

> On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <[email protected]> wrote:
>
> >    3. Support reading of Kafka partitions that were added to topic/s
> while
> >    a Pipeline reads from them - BEAM-727
> >    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
> >
>
> I think this is doable (assuming some caveats about generateInitalSplits()
> contract). How important is this feature?
>
Well, I don't know how to estimate the importance but I can say that Kafka
allows this, and Spark Streaming supports this - the way Apache Spark
currently reads from Kafka is by polling start-end offsets before
scheduling every micro-batch job, so if suddenly a new partition appears
it's simply added to the list of partitions to read from and the driver
program tracks the offsets.
I assume Beam would like to support added partitions eventually and from
what I understand Dataflow will have to restart the pipeline (Flink too ?),
but it doesn't change the fact that we'll have to assign in a way that
keeps previously assigned partitions the same.

>
> Some basic questions about Spark runner :
>
>    - do number of partitions stay same during life of a long running Beam
>    streaming job?
>
They should, yes, otherwise we'll have a consistency issue with partition
assignment.

>    - Will generateIntialSplits() be called more than once during the life
>    of a job?
>
This is an important point! yes, every "batch interval" Spark will execute
a new "micro" job and (the runner) will treat UnboundedSource as a sort of
BoundedReadFromUnboundedSource with bounds on time (proportional to the
interval time) and possibly records, so generateIntialSplits() will be
re-evaluated each time. That's way the runner should keep the original
(first) splitting, by supplying UnboundedSource with the same *desiredNumSplits
*as negotiated at first (against the Spark backend - YARN/Mesos etc.).

>    - When a job is restarted, is generateInitialSplits() invoked again?
>       - if yes, do you expect  'desiredNumSplits' for
>       generateInitialSplits() to stay same as previous run?
>
As mentioned above, the execution itself is a constant
"restart/resubmission" of jobs, and upon a "real" restart (due to failure
recovery of user-initiated stop-start) the runner is supposed do preserve
the original *desiredNumSplits, *and the UnboundedSource is expected to
consistently re-assign the splits.

>       - if no, are the readers instantiated from previous runs?
>

Reply via email to