Just looking to the future, have you given any thought on how well this would work on https://s.apache.org/splittable-do-fn?
On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela <[email protected]> wrote: > Thanks Max! > > I'll try to explain Spark's stateful operators and how/why I used them with > UnboundedSource. > > Spark has two stateful operators: *updateStateByKey* and *mapWithState*. > Since updateStateByKey is bound to output the (updated) state itself - the > CheckpointMark in our case - we're left with mapWithState. > mapWithState provides a persistent, distributed "map-like", that is > partitioned according to the stream. This is indeed how I manage state > between micro-batches. > However, mapWithState (like any map) will give you a value (state) > corresponding to a specific key, so I use a running-id from the initial > splitting to identify the appropriate state. > I took a look at Flink's implementation ( I do that sometimes ;-) ) and I > could do the same and save the split source with the CheckpointMark but > it'll still have to correspond to the same id, and since I had to wrap the > split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply > added an id field and I'm hashing by that id. > I'll also add that the stateful operator can only be applied to a > (Pair)Stream and not to input operators so I'm actually generating a stream > of splits (the same ones for every micro-batch) and reading from within the > mappingFunction of the mapWithState. > > It's not the simplest design, but given how Spark's persistent state and > InputDStream are designed comparing to the Beam model, I don't see another > way - though I'd be happy to hear one! > > Pretty sure I've added this here but no harm in adding the link again: design > doc > <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing> > and > a work-in-progress branch > <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all > mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as > well. > The design doc also relates to how "pure" Spark works with Kafka, which I > think is interesting and very different from Flink/Dataflow. > > Hope this helped clear things up a little, please keep on asking if > something is not clear yet. > > Thanks, > Amit. > > On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <[email protected]> wrote: > >> Just to add a comment from the Flink side and its >> >> UnboundedSourceWrapper. We experienced the only way to guarantee >> >> deterministic splitting of the source, was to generate the splits upon >> >> creation of the source and then checkpoint the assignment during >> >> runtime. When restoring from a checkpoint, the same reader >> >> configuration is restored. It's not possible to change the splitting >> >> after the initial splitting has taken place. However, Flink will soon >> >> be able to repartition the operator state upon restart/rescaling of a >> >> job. >> >> >> >> Does Spark have a way to pass state of a previous mini batch to the >> >> current mini batch? If so, you could restore the last configuration >> >> and continue reading from the checkpointed offset. You just have to >> >> checkpoint before the mini batch ends. >> >> >> >> -Max >> >> >> >> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> > Hi Amit, >> >> > >> >> > thanks for the explanation. >> >> > >> >> > For 4, you are right, it's slightly different from DataXchange (related >> to >> >> > the elements in the PCollection). I think storing the "starting point" >> for a >> >> > reader makes sense. >> >> > >> >> > Regards >> >> > JB >> >> > >> >> > >> >> > On 10/10/2016 10:33 AM, Amit Sela wrote: >> >> >> >> >> >> Inline, thanks JB! >> >> >> >> >> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <[email protected]> >> >> >> wrote: >> >> >> >> >> >>> Hi Amit, >> >> >>> >> >> >>> >> >> >>> >> >> >>> For 1., the runner is responsible of the checkpoint storage (associated >> >> >>> >> >> >>> with the source). It's the way for the runner to retry and know the >> >> >>> >> >> >>> failed bundles. >> >> >>> >> >> >> True, this was a recap/summary of another, not-so-clear, thread. >> >> >> >> >> >>> >> >> >>> >> >> >>> >> >> >>> For 4, are you proposing that KafkaRecord store additional metadata for >> >> >>> >> >> >>> that ? It sounds like what I proposed in the "Technical Vision" >> appendix >> >> >>> >> >> >>> document: there I proposed to introduce a DataXchange object that store >> >> >>> >> >> >>> some additional metadata (like offset) used by the runner. It would be >> >> >>> >> >> >>> the same with SDF as the tracker state should be persistent as well. >> >> >>> >> >> >> I think I was more focused on persisting the "starting point" for a >> >> >> reader, >> >> >> even if no records were read (yet), so that the next time the reader >> >> >> attempts to read it will pick of there. This has more to do with how the >> >> >> CheckpointMark handles this. >> >> >> I have to say that I'm not familiar with your DataXchange proposal, I >> will >> >> >> take a look though. >> >> >> >> >> >>> >> >> >>> >> >> >>> >> >> >>> Regards >> >> >>> >> >> >>> JB >> >> >>> >> >> >>> >> >> >>> >> >> >>> On 10/08/2016 01:55 AM, Amit Sela wrote: >> >> >>> >> >> >>>> I started a thread about (suggesting) UnboundedSource splitId's and it >> >> >>> >> >> >>> >> >> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's >> best >> >> >>> >> >> >>> to >> >> >>> >> >> >>>> start over in a clear [DISCUSS] thread. >> >> >>> >> >> >>> >> >> >>>> >> >> >>> >> >> >>>> When working on UnboundedSource support for the Spark runner, I've >> >> >>>> raised >> >> >>> >> >> >>> >> >> >>>> some questions, some were general-UnboundedSource, and others >> >> >>> >> >> >>> >> >> >>>> Kafka-specific. >> >> >>> >> >> >>> >> >> >>>> >> >> >>> >> >> >>>> I'd like to recap them here, and maybe have a more productive and >> >> >>> >> >> >>> >> >> >>>> well-documented discussion for everyone. >> >> >>> >> >> >>> >> >> >>>> >> >> >>> >> >> >>>> 1. UnboundedSource id's - I assume any runner persists the >> >> >>> >> >> >>> >> >> >>>> UnboundedSources's CheckpointMark for fault-tolerance, but I wonder >> >> >>> >> >> >>> how it >> >> >>> >> >> >>>> matches the appropriate split (of the UnboundedSource) to it's >> >> >>> >> >> >>> previously >> >> >>> >> >> >>>> persisted CheckpointMark in any specific worker ? >> >> >>> >> >> >>> >> >> >>>> *Thomas Groh* mentioned that Source splits have to have an >> >> >>> >> >> >>> >> >> >>>> associated identifier, >> >> >>> >> >> >>> >> >> >>>> and so the runner gets to tag splits however it pleases, so long as >> >> >>> >> >> >>> >> >> >>>> those tags don't allow splits to bleed into each other. >> >> >>> >> >> >>> >> >> >>>> 2. Consistent splitting - an UnboundedSource splitting seems to >> >> >>> >> >> >>> require >> >> >>> >> >> >>>> consistent splitting if it were to "pick-up where it left", >> correct ? >> >> >>> >> >> >>> this >> >> >>> >> >> >>>> is not mentioned as a requirement or a recommendation in >> >> >>> >> >> >>> >> >> >>>> UnboundedSource#generateInitialSplits(), so is this a Kafka-only >> >> >>> >> >> >>> issue ? >> >> >>> >> >> >>>> *Raghu Angadi* mentioned that Kafka already does so by applying >> >> >>> >> >> >>> >> >> >>>> partitions to readers in a round-robin manner. >> >> >>> >> >> >>> >> >> >>>> *Thomas Groh* also added that while the UnboundedSource API doesn't >> >> >>> >> >> >>> >> >> >>>> require deterministic splitting (although it's recommended), a >> >> >>> >> >> >>> >> >> >>>> PipelineRunner >> >> >>> >> >> >>> >> >> >>>> should keep track of the initially generated splits. >> >> >>> >> >> >>> >> >> >>>> 3. Support reading of Kafka partitions that were added to topic/s >> >> >>> >> >> >>> while >> >> >>> >> >> >>>> a Pipeline reads from them - BEAM-727 >> >> >>> >> >> >>> >> >> >>>> <https://issues.apache.org/jira/browse/BEAM-727> was filed. >> >> >>> >> >> >>> >> >> >>>> 4. Reading/persisting Kafka start offsets - since Spark works in >> >> >>> >> >> >>> >> >> >>>> micro-batches, if "latest" was applied on a fairly sparse topic >> each >> >> >>> >> >> >>> worker >> >> >>> >> >> >>>> would actually begin reading only after it saw a message during the >> >> >>> >> >> >>> time >> >> >>> >> >> >>>> window it had to read messages. This is because fetching the >> offsets >> >> >>> >> >> >>> is >> >> >>> >> >> >>>> done by the worker running the Reader. This means that each Reader >> >> >>> >> >> >>> sees a >> >> >>> >> >> >>>> different state of "latest" (for his partition/s), such that a >> >> >>>> failing >> >> >>> >> >> >>> >> >> >>>> Reader that hasn't read yet might fetch a different "latest" once >> >> >>>> it's >> >> >>> >> >> >>> >> >> >>>> recovered then what it originally fetched. While this may not be as >> >> >>> >> >> >>> painful >> >> >>> >> >> >>>> for other runners, IMHO it lacks correctness and I'd suggest either >> >> >>> >> >> >>> reading >> >> >>> >> >> >>>> Kafka metadata of the Kafka cluster once upon initial splitting, or >> >> >>> >> >> >>> add >> >> >>> >> >> >>>> some of it to the CheckpointMark. Filed BEAM-704 >> >> >>> >> >> >>> >> >> >>>> <https://issues.apache.org/jira/browse/BEAM-704>. >> >> >>> >> >> >>> >> >> >>>> >> >> >>> >> >> >>>> The original thread is called "Should UnboundedSource provide a split >> >> >>> >> >> >>> >> >> >>>> identifier ?". >> >> >>> >> >> >>> >> >> >>>> >> >> >>> >> >> >>>> While the only specific implementation of UnboundedSource discussed >> here >> >> >>> >> >> >>> is >> >> >>> >> >> >>>> Kafka, it is probably the most popular open-source UnboundedSource. >> >> >>> >> >> >>> Having >> >> >>> >> >> >>>> said that, I wonder where this meets PubSub ? or any other >> >> >>> >> >> >>> UnboundedSource >> >> >>> >> >> >>>> that those questions might affect. >> >> >>> >> >> >>> >> >> >>>> >> >> >>> >> >> >>>> Thanks, >> >> >>> >> >> >>> >> >> >>>> Amit >> >> >>> >> >> >>> >> >> >>>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> -- >> >> >>> >> >> >>> Jean-Baptiste Onofré >> >> >>> >> >> >>> [email protected] >> >> >>> >> >> >>> http://blog.nanthrax.net >> >> >>> >> >> >>> Talend - http://www.talend.com >> >> >>> >> >> >>> >> >> >> >> >> > >> >> > -- >> >> > Jean-Baptiste Onofré >> >> > [email protected] >> >> > http://blog.nanthrax.net >> >> > Talend - http://www.talend.com >> >>
