Thanks Raghu! WIP PR for UnboundedSource support: https://github.com/amitsela/incubator-beam/commits/BEAM-658-WIP JIRA ticket: https://issues.apache.org/jira/browse/BEAM-658 Design document: https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing
The PR still needs some polishing, and better testing, I'll be working on it this week. Thanks, Amit On Sun, Oct 9, 2016 at 2:54 AM Raghu Angadi <[email protected]> wrote: > Thanks for the updates Amit. > > One follow up: when does the Spark runner ask for CheckpointMark from the > reader (at the end of a micro batch?)? > I've wrapped the entire Read within Spark's stateful operator (mapWithState), so at the end of each read the CheckpointMark is saved. > > Spark runner seems different enough from Dataflow and Flink that it is a > good show case for Beam runner API and implementations. It is not surprise > that you are seeing some issues, I think we can work these out. > > I am interested in updating KafkaIO to support change in Kafka partitions > at run time. Looks like it would work for Spark too (as deriredNumSplits > stays constant). > > You might not need to use 'BoundedReadFromUnboundedSource', but that could > be a future improvement (e.g. Dataflow runner has full control of life > cycle of a source/reader. > I'm using a slightly different version of `BoundedReadFromUnboundedSource`, and I think I have no choice because I have to bound the read somehow or the micro-batch will grow indefinitely. > > I will take a look at the Spark runner.. do you have a branch that you are > working with? > > Raghu. > > > On Sat, Oct 8, 2016 at 11:33 AM, Amit Sela <[email protected]> wrote: > > > Some answers inline. > > @Raghu I'll review the PR tomorrow. > > > > Thanks, > > Amit > > > > On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <[email protected]> > > wrote: > > > > > On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <[email protected]> > wrote: > > > > > > > 3. Support reading of Kafka partitions that were added to topic/s > > > while > > > > a Pipeline reads from them - BEAM-727 > > > > <https://issues.apache.org/jira/browse/BEAM-727> was filed. > > > > > > > > > > I think this is doable (assuming some caveats about > > generateInitalSplits() > > > contract). How important is this feature? > > > > > Well, I don't know how to estimate the importance but I can say that > Kafka > > allows this, and Spark Streaming supports this - the way Apache Spark > > currently reads from Kafka is by polling start-end offsets before > > scheduling every micro-batch job, so if suddenly a new partition appears > > it's simply added to the list of partitions to read from and the driver > > program tracks the offsets. > > I assume Beam would like to support added partitions eventually and from > > what I understand Dataflow will have to restart the pipeline (Flink too > ?), > > but it doesn't change the fact that we'll have to assign in a way that > > keeps previously assigned partitions the same. > > > > > > > > Some basic questions about Spark runner : > > > > > > - do number of partitions stay same during life of a long running > Beam > > > streaming job? > > > > > They should, yes, otherwise we'll have a consistency issue with partition > > assignment. > > > > > - Will generateIntialSplits() be called more than once during the > life > > > of a job? > > > > > This is an important point! yes, every "batch interval" Spark will > execute > > a new "micro" job and (the runner) will treat UnboundedSource as a sort > of > > BoundedReadFromUnboundedSource with bounds on time (proportional to the > > interval time) and possibly records, so generateIntialSplits() will be > > re-evaluated each time. That's way the runner should keep the original > > (first) splitting, by supplying UnboundedSource with the same > > *desiredNumSplits > > *as negotiated at first (against the Spark backend - YARN/Mesos etc.). > > > > > - When a job is restarted, is generateInitialSplits() invoked again? > > > - if yes, do you expect 'desiredNumSplits' for > > > generateInitialSplits() to stay same as previous run? > > > > > As mentioned above, the execution itself is a constant > > "restart/resubmission" of jobs, and upon a "real" restart (due to failure > > recovery of user-initiated stop-start) the runner is supposed do preserve > > the original *desiredNumSplits, *and the UnboundedSource is expected to > > consistently re-assign the splits. > > > > > - if no, are the readers instantiated from previous runs? > > > > > >
