That's a good question Robert, and I did. First of all, an UnboundedSource is split into splits that implement a sort of "BoundedReadFromUnboundedSource", with Restrictions on time and (optional) number of records - this seems to fit nicely into the *SDF* language.
Taking a look at the diagram in the Spark runner's UnboundedSource design doc, and the diagram placed just above "Restrictions, blocks and positions <https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#heading=h.vjs7pzbb7kw>" there seems to be a lot of resemblance. Finally, the idea of reading from within a DoFn is exactly what I'm doing here, reading from within the mapWithState's "mappingFunction" - a function that maps: <K, Option<V>, State> *=>* Iterator<ReadElementsT> Mapping a Source (and it's State and possibly restrictions) into a bunch of read records. This all *seems* to be a good fit, but I'll probably have to keep following closely to see how the API is forming. Thanks, Amit On Mon, Oct 10, 2016 at 8:23 PM Robert Bradshaw <rober...@google.com.invalid> wrote: > Just looking to the future, have you given any thought on how well > this would work on https://s.apache.org/splittable-do-fn? > > On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela <amitsel...@gmail.com> wrote: > > Thanks Max! > > > > I'll try to explain Spark's stateful operators and how/why I used them > with > > UnboundedSource. > > > > Spark has two stateful operators: *updateStateByKey* and *mapWithState*. > > Since updateStateByKey is bound to output the (updated) state itself - > the > > CheckpointMark in our case - we're left with mapWithState. > > mapWithState provides a persistent, distributed "map-like", that is > > partitioned according to the stream. This is indeed how I manage state > > between micro-batches. > > However, mapWithState (like any map) will give you a value (state) > > corresponding to a specific key, so I use a running-id from the initial > > splitting to identify the appropriate state. > > I took a look at Flink's implementation ( I do that sometimes ;-) ) and I > > could do the same and save the split source with the CheckpointMark but > > it'll still have to correspond to the same id, and since I had to wrap > the > > split Source to perform a sort of "BoundedReadFromUnboundedSource" I > simply > > added an id field and I'm hashing by that id. > > I'll also add that the stateful operator can only be applied to a > > (Pair)Stream and not to input operators so I'm actually generating a > stream > > of splits (the same ones for every micro-batch) and reading from within > the > > mappingFunction of the mapWithState. > > > > It's not the simplest design, but given how Spark's persistent state and > > InputDStream are designed comparing to the Beam model, I don't see > another > > way - though I'd be happy to hear one! > > > > Pretty sure I've added this here but no harm in adding the link again: > design > > doc > > < > https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing > > > > and > > a work-in-progress branch > > <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all > > mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> > as > > well. > > The design doc also relates to how "pure" Spark works with Kafka, which I > > think is interesting and very different from Flink/Dataflow. > > > > Hope this helped clear things up a little, please keep on asking if > > something is not clear yet. > > > > Thanks, > > Amit. > > > > On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <m...@apache.org> > wrote: > > > >> Just to add a comment from the Flink side and its > >> > >> UnboundedSourceWrapper. We experienced the only way to guarantee > >> > >> deterministic splitting of the source, was to generate the splits upon > >> > >> creation of the source and then checkpoint the assignment during > >> > >> runtime. When restoring from a checkpoint, the same reader > >> > >> configuration is restored. It's not possible to change the splitting > >> > >> after the initial splitting has taken place. However, Flink will soon > >> > >> be able to repartition the operator state upon restart/rescaling of a > >> > >> job. > >> > >> > >> > >> Does Spark have a way to pass state of a previous mini batch to the > >> > >> current mini batch? If so, you could restore the last configuration > >> > >> and continue reading from the checkpointed offset. You just have to > >> > >> checkpoint before the mini batch ends. > >> > >> > >> > >> -Max > >> > >> > >> > >> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <j...@nanthrax.net > > > >> wrote: > >> > >> > Hi Amit, > >> > >> > > >> > >> > thanks for the explanation. > >> > >> > > >> > >> > For 4, you are right, it's slightly different from DataXchange > (related > >> to > >> > >> > the elements in the PCollection). I think storing the "starting point" > >> for a > >> > >> > reader makes sense. > >> > >> > > >> > >> > Regards > >> > >> > JB > >> > >> > > >> > >> > > >> > >> > On 10/10/2016 10:33 AM, Amit Sela wrote: > >> > >> >> > >> > >> >> Inline, thanks JB! > >> > >> >> > >> > >> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré < > j...@nanthrax.net> > >> > >> >> wrote: > >> > >> >> > >> > >> >>> Hi Amit, > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> For 1., the runner is responsible of the checkpoint storage > (associated > >> > >> >>> > >> > >> >>> with the source). It's the way for the runner to retry and know the > >> > >> >>> > >> > >> >>> failed bundles. > >> > >> >>> > >> > >> >> True, this was a recap/summary of another, not-so-clear, thread. > >> > >> >> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> For 4, are you proposing that KafkaRecord store additional metadata > for > >> > >> >>> > >> > >> >>> that ? It sounds like what I proposed in the "Technical Vision" > >> appendix > >> > >> >>> > >> > >> >>> document: there I proposed to introduce a DataXchange object that > store > >> > >> >>> > >> > >> >>> some additional metadata (like offset) used by the runner. It would > be > >> > >> >>> > >> > >> >>> the same with SDF as the tracker state should be persistent as well. > >> > >> >>> > >> > >> >> I think I was more focused on persisting the "starting point" for a > >> > >> >> reader, > >> > >> >> even if no records were read (yet), so that the next time the reader > >> > >> >> attempts to read it will pick of there. This has more to do with how > the > >> > >> >> CheckpointMark handles this. > >> > >> >> I have to say that I'm not familiar with your DataXchange proposal, I > >> will > >> > >> >> take a look though. > >> > >> >> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> Regards > >> > >> >>> > >> > >> >>> JB > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> On 10/08/2016 01:55 AM, Amit Sela wrote: > >> > >> >>> > >> > >> >>>> I started a thread about (suggesting) UnboundedSource splitId's > and it > >> > >> >>> > >> > >> >>> > >> > >> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's > >> best > >> > >> >>> > >> > >> >>> to > >> > >> >>> > >> > >> >>>> start over in a clear [DISCUSS] thread. > >> > >> >>> > >> > >> >>> > >> > >> >>>> > >> > >> >>> > >> > >> >>>> When working on UnboundedSource support for the Spark runner, I've > >> > >> >>>> raised > >> > >> >>> > >> > >> >>> > >> > >> >>>> some questions, some were general-UnboundedSource, and others > >> > >> >>> > >> > >> >>> > >> > >> >>>> Kafka-specific. > >> > >> >>> > >> > >> >>> > >> > >> >>>> > >> > >> >>> > >> > >> >>>> I'd like to recap them here, and maybe have a more productive and > >> > >> >>> > >> > >> >>> > >> > >> >>>> well-documented discussion for everyone. > >> > >> >>> > >> > >> >>> > >> > >> >>>> > >> > >> >>> > >> > >> >>>> 1. UnboundedSource id's - I assume any runner persists the > >> > >> >>> > >> > >> >>> > >> > >> >>>> UnboundedSources's CheckpointMark for fault-tolerance, but I > wonder > >> > >> >>> > >> > >> >>> how it > >> > >> >>> > >> > >> >>>> matches the appropriate split (of the UnboundedSource) to it's > >> > >> >>> > >> > >> >>> previously > >> > >> >>> > >> > >> >>>> persisted CheckpointMark in any specific worker ? > >> > >> >>> > >> > >> >>> > >> > >> >>>> *Thomas Groh* mentioned that Source splits have to have an > >> > >> >>> > >> > >> >>> > >> > >> >>>> associated identifier, > >> > >> >>> > >> > >> >>> > >> > >> >>>> and so the runner gets to tag splits however it pleases, so > long as > >> > >> >>> > >> > >> >>> > >> > >> >>>> those tags don't allow splits to bleed into each other. > >> > >> >>> > >> > >> >>> > >> > >> >>>> 2. Consistent splitting - an UnboundedSource splitting seems to > >> > >> >>> > >> > >> >>> require > >> > >> >>> > >> > >> >>>> consistent splitting if it were to "pick-up where it left", > >> correct ? > >> > >> >>> > >> > >> >>> this > >> > >> >>> > >> > >> >>>> is not mentioned as a requirement or a recommendation in > >> > >> >>> > >> > >> >>> > >> > >> >>>> UnboundedSource#generateInitialSplits(), so is this a Kafka-only > >> > >> >>> > >> > >> >>> issue ? > >> > >> >>> > >> > >> >>>> *Raghu Angadi* mentioned that Kafka already does so by applying > >> > >> >>> > >> > >> >>> > >> > >> >>>> partitions to readers in a round-robin manner. > >> > >> >>> > >> > >> >>> > >> > >> >>>> *Thomas Groh* also added that while the UnboundedSource API > doesn't > >> > >> >>> > >> > >> >>> > >> > >> >>>> require deterministic splitting (although it's recommended), a > >> > >> >>> > >> > >> >>> > >> > >> >>>> PipelineRunner > >> > >> >>> > >> > >> >>> > >> > >> >>>> should keep track of the initially generated splits. > >> > >> >>> > >> > >> >>> > >> > >> >>>> 3. Support reading of Kafka partitions that were added to > topic/s > >> > >> >>> > >> > >> >>> while > >> > >> >>> > >> > >> >>>> a Pipeline reads from them - BEAM-727 > >> > >> >>> > >> > >> >>> > >> > >> >>>> <https://issues.apache.org/jira/browse/BEAM-727> was filed. > >> > >> >>> > >> > >> >>> > >> > >> >>>> 4. Reading/persisting Kafka start offsets - since Spark works in > >> > >> >>> > >> > >> >>> > >> > >> >>>> micro-batches, if "latest" was applied on a fairly sparse topic > >> each > >> > >> >>> > >> > >> >>> worker > >> > >> >>> > >> > >> >>>> would actually begin reading only after it saw a message during > the > >> > >> >>> > >> > >> >>> time > >> > >> >>> > >> > >> >>>> window it had to read messages. This is because fetching the > >> offsets > >> > >> >>> > >> > >> >>> is > >> > >> >>> > >> > >> >>>> done by the worker running the Reader. This means that each > Reader > >> > >> >>> > >> > >> >>> sees a > >> > >> >>> > >> > >> >>>> different state of "latest" (for his partition/s), such that a > >> > >> >>>> failing > >> > >> >>> > >> > >> >>> > >> > >> >>>> Reader that hasn't read yet might fetch a different "latest" > once > >> > >> >>>> it's > >> > >> >>> > >> > >> >>> > >> > >> >>>> recovered then what it originally fetched. While this may not > be as > >> > >> >>> > >> > >> >>> painful > >> > >> >>> > >> > >> >>>> for other runners, IMHO it lacks correctness and I'd suggest > either > >> > >> >>> > >> > >> >>> reading > >> > >> >>> > >> > >> >>>> Kafka metadata of the Kafka cluster once upon initial > splitting, or > >> > >> >>> > >> > >> >>> add > >> > >> >>> > >> > >> >>>> some of it to the CheckpointMark. Filed BEAM-704 > >> > >> >>> > >> > >> >>> > >> > >> >>>> <https://issues.apache.org/jira/browse/BEAM-704>. > >> > >> >>> > >> > >> >>> > >> > >> >>>> > >> > >> >>> > >> > >> >>>> The original thread is called "Should UnboundedSource provide a > split > >> > >> >>> > >> > >> >>> > >> > >> >>>> identifier ?". > >> > >> >>> > >> > >> >>> > >> > >> >>>> > >> > >> >>> > >> > >> >>>> While the only specific implementation of UnboundedSource discussed > >> here > >> > >> >>> > >> > >> >>> is > >> > >> >>> > >> > >> >>>> Kafka, it is probably the most popular open-source UnboundedSource. > >> > >> >>> > >> > >> >>> Having > >> > >> >>> > >> > >> >>>> said that, I wonder where this meets PubSub ? or any other > >> > >> >>> > >> > >> >>> UnboundedSource > >> > >> >>> > >> > >> >>>> that those questions might affect. > >> > >> >>> > >> > >> >>> > >> > >> >>>> > >> > >> >>> > >> > >> >>>> Thanks, > >> > >> >>> > >> > >> >>> > >> > >> >>>> Amit > >> > >> >>> > >> > >> >>> > >> > >> >>>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> -- > >> > >> >>> > >> > >> >>> Jean-Baptiste Onofré > >> > >> >>> > >> > >> >>> jbono...@apache.org > >> > >> >>> > >> > >> >>> http://blog.nanthrax.net > >> > >> >>> > >> > >> >>> Talend - http://www.talend.com > >> > >> >>> > >> > >> >>> > >> > >> >> > >> > >> > > >> > >> > -- > >> > >> > Jean-Baptiste Onofré > >> > >> > jbono...@apache.org > >> > >> > http://blog.nanthrax.net > >> > >> > Talend - http://www.talend.com > >> > >> >