Some answers inline. @Raghu I'll review the PR tomorrow. Thanks, Amit
On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <[email protected]> wrote: > On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <[email protected]> wrote: > > > 3. Support reading of Kafka partitions that were added to topic/s > while > > a Pipeline reads from them - BEAM-727 > > <https://issues.apache.org/jira/browse/BEAM-727> was filed. > > > > I think this is doable (assuming some caveats about generateInitalSplits() > contract). How important is this feature? > Well, I don't know how to estimate the importance but I can say that Kafka allows this, and Spark Streaming supports this - the way Apache Spark currently reads from Kafka is by polling start-end offsets before scheduling every micro-batch job, so if suddenly a new partition appears it's simply added to the list of partitions to read from and the driver program tracks the offsets. I assume Beam would like to support added partitions eventually and from what I understand Dataflow will have to restart the pipeline (Flink too ?), but it doesn't change the fact that we'll have to assign in a way that keeps previously assigned partitions the same. > > Some basic questions about Spark runner : > > - do number of partitions stay same during life of a long running Beam > streaming job? > They should, yes, otherwise we'll have a consistency issue with partition assignment. > - Will generateIntialSplits() be called more than once during the life > of a job? > This is an important point! yes, every "batch interval" Spark will execute a new "micro" job and (the runner) will treat UnboundedSource as a sort of BoundedReadFromUnboundedSource with bounds on time (proportional to the interval time) and possibly records, so generateIntialSplits() will be re-evaluated each time. That's way the runner should keep the original (first) splitting, by supplying UnboundedSource with the same *desiredNumSplits *as negotiated at first (against the Spark backend - YARN/Mesos etc.). > - When a job is restarted, is generateInitialSplits() invoked again? > - if yes, do you expect 'desiredNumSplits' for > generateInitialSplits() to stay same as previous run? > As mentioned above, the execution itself is a constant "restart/resubmission" of jobs, and upon a "real" restart (due to failure recovery of user-initiated stop-start) the runner is supposed do preserve the original *desiredNumSplits, *and the UnboundedSource is expected to consistently re-assign the splits. > - if no, are the readers instantiated from previous runs? >
