Thanks Max! I'll try to explain Spark's stateful operators and how/why I used them with UnboundedSource.
Spark has two stateful operators: *updateStateByKey* and *mapWithState*. Since updateStateByKey is bound to output the (updated) state itself - the CheckpointMark in our case - we're left with mapWithState. mapWithState provides a persistent, distributed "map-like", that is partitioned according to the stream. This is indeed how I manage state between micro-batches. However, mapWithState (like any map) will give you a value (state) corresponding to a specific key, so I use a running-id from the initial splitting to identify the appropriate state. I took a look at Flink's implementation ( I do that sometimes ;-) ) and I could do the same and save the split source with the CheckpointMark but it'll still have to correspond to the same id, and since I had to wrap the split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply added an id field and I'm hashing by that id. I'll also add that the stateful operator can only be applied to a (Pair)Stream and not to input operators so I'm actually generating a stream of splits (the same ones for every micro-batch) and reading from within the mappingFunction of the mapWithState. It's not the simplest design, but given how Spark's persistent state and InputDStream are designed comparing to the Beam model, I don't see another way - though I'd be happy to hear one! Pretty sure I've added this here but no harm in adding the link again: design doc <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing> and a work-in-progress branch <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as well. The design doc also relates to how "pure" Spark works with Kafka, which I think is interesting and very different from Flink/Dataflow. Hope this helped clear things up a little, please keep on asking if something is not clear yet. Thanks, Amit. On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <m...@apache.org> wrote: > Just to add a comment from the Flink side and its > > UnboundedSourceWrapper. We experienced the only way to guarantee > > deterministic splitting of the source, was to generate the splits upon > > creation of the source and then checkpoint the assignment during > > runtime. When restoring from a checkpoint, the same reader > > configuration is restored. It's not possible to change the splitting > > after the initial splitting has taken place. However, Flink will soon > > be able to repartition the operator state upon restart/rescaling of a > > job. > > > > Does Spark have a way to pass state of a previous mini batch to the > > current mini batch? If so, you could restore the last configuration > > and continue reading from the checkpointed offset. You just have to > > checkpoint before the mini batch ends. > > > > -Max > > > > On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > Hi Amit, > > > > > > thanks for the explanation. > > > > > > For 4, you are right, it's slightly different from DataXchange (related > to > > > the elements in the PCollection). I think storing the "starting point" > for a > > > reader makes sense. > > > > > > Regards > > > JB > > > > > > > > > On 10/10/2016 10:33 AM, Amit Sela wrote: > > >> > > >> Inline, thanks JB! > > >> > > >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <j...@nanthrax.net> > > >> wrote: > > >> > > >>> Hi Amit, > > >>> > > >>> > > >>> > > >>> For 1., the runner is responsible of the checkpoint storage (associated > > >>> > > >>> with the source). It's the way for the runner to retry and know the > > >>> > > >>> failed bundles. > > >>> > > >> True, this was a recap/summary of another, not-so-clear, thread. > > >> > > >>> > > >>> > > >>> > > >>> For 4, are you proposing that KafkaRecord store additional metadata for > > >>> > > >>> that ? It sounds like what I proposed in the "Technical Vision" > appendix > > >>> > > >>> document: there I proposed to introduce a DataXchange object that store > > >>> > > >>> some additional metadata (like offset) used by the runner. It would be > > >>> > > >>> the same with SDF as the tracker state should be persistent as well. > > >>> > > >> I think I was more focused on persisting the "starting point" for a > > >> reader, > > >> even if no records were read (yet), so that the next time the reader > > >> attempts to read it will pick of there. This has more to do with how the > > >> CheckpointMark handles this. > > >> I have to say that I'm not familiar with your DataXchange proposal, I > will > > >> take a look though. > > >> > > >>> > > >>> > > >>> > > >>> Regards > > >>> > > >>> JB > > >>> > > >>> > > >>> > > >>> On 10/08/2016 01:55 AM, Amit Sela wrote: > > >>> > > >>>> I started a thread about (suggesting) UnboundedSource splitId's and it > > >>> > > >>> > > >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's > best > > >>> > > >>> to > > >>> > > >>>> start over in a clear [DISCUSS] thread. > > >>> > > >>> > > >>>> > > >>> > > >>>> When working on UnboundedSource support for the Spark runner, I've > > >>>> raised > > >>> > > >>> > > >>>> some questions, some were general-UnboundedSource, and others > > >>> > > >>> > > >>>> Kafka-specific. > > >>> > > >>> > > >>>> > > >>> > > >>>> I'd like to recap them here, and maybe have a more productive and > > >>> > > >>> > > >>>> well-documented discussion for everyone. > > >>> > > >>> > > >>>> > > >>> > > >>>> 1. UnboundedSource id's - I assume any runner persists the > > >>> > > >>> > > >>>> UnboundedSources's CheckpointMark for fault-tolerance, but I wonder > > >>> > > >>> how it > > >>> > > >>>> matches the appropriate split (of the UnboundedSource) to it's > > >>> > > >>> previously > > >>> > > >>>> persisted CheckpointMark in any specific worker ? > > >>> > > >>> > > >>>> *Thomas Groh* mentioned that Source splits have to have an > > >>> > > >>> > > >>>> associated identifier, > > >>> > > >>> > > >>>> and so the runner gets to tag splits however it pleases, so long as > > >>> > > >>> > > >>>> those tags don't allow splits to bleed into each other. > > >>> > > >>> > > >>>> 2. Consistent splitting - an UnboundedSource splitting seems to > > >>> > > >>> require > > >>> > > >>>> consistent splitting if it were to "pick-up where it left", > correct ? > > >>> > > >>> this > > >>> > > >>>> is not mentioned as a requirement or a recommendation in > > >>> > > >>> > > >>>> UnboundedSource#generateInitialSplits(), so is this a Kafka-only > > >>> > > >>> issue ? > > >>> > > >>>> *Raghu Angadi* mentioned that Kafka already does so by applying > > >>> > > >>> > > >>>> partitions to readers in a round-robin manner. > > >>> > > >>> > > >>>> *Thomas Groh* also added that while the UnboundedSource API doesn't > > >>> > > >>> > > >>>> require deterministic splitting (although it's recommended), a > > >>> > > >>> > > >>>> PipelineRunner > > >>> > > >>> > > >>>> should keep track of the initially generated splits. > > >>> > > >>> > > >>>> 3. Support reading of Kafka partitions that were added to topic/s > > >>> > > >>> while > > >>> > > >>>> a Pipeline reads from them - BEAM-727 > > >>> > > >>> > > >>>> <https://issues.apache.org/jira/browse/BEAM-727> was filed. > > >>> > > >>> > > >>>> 4. Reading/persisting Kafka start offsets - since Spark works in > > >>> > > >>> > > >>>> micro-batches, if "latest" was applied on a fairly sparse topic > each > > >>> > > >>> worker > > >>> > > >>>> would actually begin reading only after it saw a message during the > > >>> > > >>> time > > >>> > > >>>> window it had to read messages. This is because fetching the > offsets > > >>> > > >>> is > > >>> > > >>>> done by the worker running the Reader. This means that each Reader > > >>> > > >>> sees a > > >>> > > >>>> different state of "latest" (for his partition/s), such that a > > >>>> failing > > >>> > > >>> > > >>>> Reader that hasn't read yet might fetch a different "latest" once > > >>>> it's > > >>> > > >>> > > >>>> recovered then what it originally fetched. While this may not be as > > >>> > > >>> painful > > >>> > > >>>> for other runners, IMHO it lacks correctness and I'd suggest either > > >>> > > >>> reading > > >>> > > >>>> Kafka metadata of the Kafka cluster once upon initial splitting, or > > >>> > > >>> add > > >>> > > >>>> some of it to the CheckpointMark. Filed BEAM-704 > > >>> > > >>> > > >>>> <https://issues.apache.org/jira/browse/BEAM-704>. > > >>> > > >>> > > >>>> > > >>> > > >>>> The original thread is called "Should UnboundedSource provide a split > > >>> > > >>> > > >>>> identifier ?". > > >>> > > >>> > > >>>> > > >>> > > >>>> While the only specific implementation of UnboundedSource discussed > here > > >>> > > >>> is > > >>> > > >>>> Kafka, it is probably the most popular open-source UnboundedSource. > > >>> > > >>> Having > > >>> > > >>>> said that, I wonder where this meets PubSub ? or any other > > >>> > > >>> UnboundedSource > > >>> > > >>>> that those questions might affect. > > >>> > > >>> > > >>>> > > >>> > > >>>> Thanks, > > >>> > > >>> > > >>>> Amit > > >>> > > >>> > > >>>> > > >>> > > >>> > > >>> > > >>> -- > > >>> > > >>> Jean-Baptiste Onofré > > >>> > > >>> jbono...@apache.org > > >>> > > >>> http://blog.nanthrax.net > > >>> > > >>> Talend - http://www.talend.com > > >>> > > >>> > > >> > > > > > > -- > > > Jean-Baptiste Onofré > > > jbono...@apache.org > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > >