Thanks for the updates Amit. One follow up: when does the Spark runner ask for CheckpointMark from the reader (at the end of a micro batch?)?
Spark runner seems different enough from Dataflow and Flink that it is a good show case for Beam runner API and implementations. It is not surprise that you are seeing some issues, I think we can work these out. I am interested in updating KafkaIO to support change in Kafka partitions at run time. Looks like it would work for Spark too (as deriredNumSplits stays constant). You might not need to use 'BoundedReadFromUnboundedSource', but that could be a future improvement (e.g. Dataflow runner has full control of life cycle of a source/reader. I will take a look at the Spark runner.. do you have a branch that you are working with? Raghu. On Sat, Oct 8, 2016 at 11:33 AM, Amit Sela <[email protected]> wrote: > Some answers inline. > @Raghu I'll review the PR tomorrow. > > Thanks, > Amit > > On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <[email protected]> > wrote: > > > On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <[email protected]> wrote: > > > > > 3. Support reading of Kafka partitions that were added to topic/s > > while > > > a Pipeline reads from them - BEAM-727 > > > <https://issues.apache.org/jira/browse/BEAM-727> was filed. > > > > > > > I think this is doable (assuming some caveats about > generateInitalSplits() > > contract). How important is this feature? > > > Well, I don't know how to estimate the importance but I can say that Kafka > allows this, and Spark Streaming supports this - the way Apache Spark > currently reads from Kafka is by polling start-end offsets before > scheduling every micro-batch job, so if suddenly a new partition appears > it's simply added to the list of partitions to read from and the driver > program tracks the offsets. > I assume Beam would like to support added partitions eventually and from > what I understand Dataflow will have to restart the pipeline (Flink too ?), > but it doesn't change the fact that we'll have to assign in a way that > keeps previously assigned partitions the same. > > > > > Some basic questions about Spark runner : > > > > - do number of partitions stay same during life of a long running Beam > > streaming job? > > > They should, yes, otherwise we'll have a consistency issue with partition > assignment. > > > - Will generateIntialSplits() be called more than once during the life > > of a job? > > > This is an important point! yes, every "batch interval" Spark will execute > a new "micro" job and (the runner) will treat UnboundedSource as a sort of > BoundedReadFromUnboundedSource with bounds on time (proportional to the > interval time) and possibly records, so generateIntialSplits() will be > re-evaluated each time. That's way the runner should keep the original > (first) splitting, by supplying UnboundedSource with the same > *desiredNumSplits > *as negotiated at first (against the Spark backend - YARN/Mesos etc.). > > > - When a job is restarted, is generateInitialSplits() invoked again? > > - if yes, do you expect 'desiredNumSplits' for > > generateInitialSplits() to stay same as previous run? > > > As mentioned above, the execution itself is a constant > "restart/resubmission" of jobs, and upon a "real" restart (due to failure > recovery of user-initiated stop-start) the runner is supposed do preserve > the original *desiredNumSplits, *and the UnboundedSource is expected to > consistently re-assign the splits. > > > - if no, are the readers instantiated from previous runs? > > >
