That's a good question Robert, and I did.

First of all, an UnboundedSource is split into splits that implement a sort
of "BoundedReadFromUnboundedSource", with Restrictions on time and
(optional) number of records - this seems to fit nicely into the *SDF*
 language.

Taking a look at the diagram in the Spark runner's UnboundedSource design
doc, and the diagram placed just above "Restrictions, blocks and positions
<https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#heading=h.vjs7pzbb7kw>"
there seems to be a lot of resemblance.

Finally, the idea of reading from within a DoFn is exactly what I'm doing
here, reading from within the mapWithState's "mappingFunction" - a function
that maps:
 <K, Option<V>, State> *=>* Iterator<ReadElementsT>
Mapping a Source (and it's State and possibly restrictions) into a bunch of
read records.

This all *seems* to be a good fit, but I'll probably have to keep following
closely to see how the API is forming.

Thanks,
Amit

On Mon, Oct 10, 2016 at 8:23 PM Robert Bradshaw <rober...@google.com.invalid>
wrote:

> Just looking to the future, have you given any thought on how well
> this would work on https://s.apache.org/splittable-do-fn?
>
> On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela <amitsel...@gmail.com> wrote:
> > Thanks Max!
> >
> > I'll try to explain Spark's stateful operators and how/why I used them
> with
> > UnboundedSource.
> >
> > Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> > Since updateStateByKey is bound to output the (updated) state itself -
> the
> > CheckpointMark in our case - we're left with mapWithState.
> > mapWithState provides a persistent, distributed "map-like", that is
> > partitioned according to the stream. This is indeed how I manage state
> > between micro-batches.
> > However, mapWithState (like any map) will give you a value (state)
> > corresponding to a specific key, so I use a running-id from the initial
> > splitting to identify the appropriate state.
> > I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> > could do the same and save the split source with the CheckpointMark but
> > it'll still have to correspond to the same id, and since I had to wrap
> the
> > split Source to perform a sort of "BoundedReadFromUnboundedSource" I
> simply
> > added an id field and I'm hashing by that id.
> > I'll also add that the stateful operator can only be applied to a
> > (Pair)Stream and not to input operators so I'm actually generating a
> stream
> > of splits (the same ones for every micro-batch) and reading from within
> the
> > mappingFunction of the mapWithState.
> >
> > It's not the simplest design, but given how Spark's persistent state and
> > InputDStream are designed comparing to the Beam model, I don't see
> another
> > way - though I'd be happy to hear one!
> >
> > Pretty sure I've added this here but no harm in adding the link again:
> design
> > doc
> > <
> https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing
> >
> > and
> > a work-in-progress branch
> > <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
> > mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658>
> as
> > well.
> > The design doc also relates to how "pure" Spark works with Kafka, which I
> > think is interesting and very different from Flink/Dataflow.
> >
> > Hope this helped clear things up a little, please keep on asking if
> > something is not clear yet.
> >
> > Thanks,
> > Amit.
> >
> > On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <m...@apache.org>
> wrote:
> >
> >> Just to add a comment from the Flink side and its
> >>
> >> UnboundedSourceWrapper. We experienced the only way to guarantee
> >>
> >> deterministic splitting of the source, was to generate the splits upon
> >>
> >> creation of the source and then checkpoint the assignment during
> >>
> >> runtime. When restoring from a checkpoint, the same reader
> >>
> >> configuration is restored. It's not possible to change the splitting
> >>
> >> after the initial splitting has taken place. However, Flink will soon
> >>
> >> be able to repartition the operator state upon restart/rescaling of a
> >>
> >> job.
> >>
> >>
> >>
> >> Does Spark have a way to pass state of a previous mini batch to the
> >>
> >> current mini batch? If so, you could restore the last configuration
> >>
> >> and continue reading from the checkpointed offset. You just have to
> >>
> >> checkpoint before the mini batch ends.
> >>
> >>
> >>
> >> -Max
> >>
> >>
> >>
> >> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <j...@nanthrax.net
> >
> >> wrote:
> >>
> >> > Hi Amit,
> >>
> >> >
> >>
> >> > thanks for the explanation.
> >>
> >> >
> >>
> >> > For 4, you are right, it's slightly different from DataXchange
> (related
> >> to
> >>
> >> > the elements in the PCollection). I think storing the "starting point"
> >> for a
> >>
> >> > reader makes sense.
> >>
> >> >
> >>
> >> > Regards
> >>
> >> > JB
> >>
> >> >
> >>
> >> >
> >>
> >> > On 10/10/2016 10:33 AM, Amit Sela wrote:
> >>
> >> >>
> >>
> >> >> Inline, thanks JB!
> >>
> >> >>
> >>
> >> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <
> j...@nanthrax.net>
> >>
> >> >> wrote:
> >>
> >> >>
> >>
> >> >>> Hi Amit,
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> For 1., the runner is responsible of the checkpoint storage
> (associated
> >>
> >> >>>
> >>
> >> >>> with the source). It's the way for the runner to retry and know the
> >>
> >> >>>
> >>
> >> >>> failed bundles.
> >>
> >> >>>
> >>
> >> >> True, this was a recap/summary of another, not-so-clear, thread.
> >>
> >> >>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> For 4, are you proposing that KafkaRecord store additional metadata
> for
> >>
> >> >>>
> >>
> >> >>> that ? It sounds like what I proposed in the "Technical Vision"
> >> appendix
> >>
> >> >>>
> >>
> >> >>> document: there I proposed to introduce a DataXchange object that
> store
> >>
> >> >>>
> >>
> >> >>> some additional metadata (like offset) used by the runner. It would
> be
> >>
> >> >>>
> >>
> >> >>> the same with SDF as the tracker state should be persistent as well.
> >>
> >> >>>
> >>
> >> >> I think I was more focused on persisting the "starting point" for a
> >>
> >> >> reader,
> >>
> >> >> even if no records were read (yet), so that the next time the reader
> >>
> >> >> attempts to read it will pick of there. This has more to do with how
> the
> >>
> >> >> CheckpointMark handles this.
> >>
> >> >> I have to say that I'm not familiar with your DataXchange proposal, I
> >> will
> >>
> >> >> take a look though.
> >>
> >> >>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> Regards
> >>
> >> >>>
> >>
> >> >>> JB
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
> >>
> >> >>>
> >>
> >> >>>> I started a thread about (suggesting) UnboundedSource splitId's
> and it
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
> >> best
> >>
> >> >>>
> >>
> >> >>> to
> >>
> >> >>>
> >>
> >> >>>> start over in a clear [DISCUSS] thread.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> When working on UnboundedSource support for the Spark runner, I've
> >>
> >> >>>> raised
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> some questions, some were general-UnboundedSource, and others
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> Kafka-specific.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> I'd like to recap them here, and maybe have a more productive and
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> well-documented discussion for everyone.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>>    1. UnboundedSource id's - I assume any runner persists the
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I
> wonder
> >>
> >> >>>
> >>
> >> >>> how it
> >>
> >> >>>
> >>
> >> >>>>    matches the appropriate split (of the UnboundedSource) to it's
> >>
> >> >>>
> >>
> >> >>> previously
> >>
> >> >>>
> >>
> >> >>>>    persisted CheckpointMark in any specific worker ?
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    *Thomas Groh* mentioned that Source splits have to have an
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> associated identifier,
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    and so the runner gets to tag splits however it pleases, so
> long as
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    those tags don't allow splits to bleed into each other.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
> >>
> >> >>>
> >>
> >> >>> require
> >>
> >> >>>
> >>
> >> >>>>    consistent splitting if it were to "pick-up where it left",
> >> correct ?
> >>
> >> >>>
> >>
> >> >>> this
> >>
> >> >>>
> >>
> >> >>>>    is not mentioned as a requirement or a recommendation in
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
> >>
> >> >>>
> >>
> >> >>> issue ?
> >>
> >> >>>
> >>
> >> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    partitions to readers in a round-robin manner.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    *Thomas Groh* also added that while the UnboundedSource API
> doesn't
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    require deterministic splitting (although it's recommended), a
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    PipelineRunner
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    should keep track of the initially generated splits.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    3. Support reading of Kafka partitions that were added to
> topic/s
> >>
> >> >>>
> >>
> >> >>> while
> >>
> >> >>>
> >>
> >> >>>>    a Pipeline reads from them - BEAM-727
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
> >> each
> >>
> >> >>>
> >>
> >> >>> worker
> >>
> >> >>>
> >>
> >> >>>>    would actually begin reading only after it saw a message during
> the
> >>
> >> >>>
> >>
> >> >>> time
> >>
> >> >>>
> >>
> >> >>>>    window it had to read messages. This is because fetching the
> >> offsets
> >>
> >> >>>
> >>
> >> >>> is
> >>
> >> >>>
> >>
> >> >>>>    done by the worker running the Reader. This means that each
> Reader
> >>
> >> >>>
> >>
> >> >>> sees a
> >>
> >> >>>
> >>
> >> >>>>    different state of "latest" (for his partition/s), such that a
> >>
> >> >>>> failing
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    Reader that hasn't read yet might fetch a different "latest"
> once
> >>
> >> >>>> it's
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    recovered then what it originally fetched. While this may not
> be as
> >>
> >> >>>
> >>
> >> >>> painful
> >>
> >> >>>
> >>
> >> >>>>    for other runners, IMHO it lacks correctness and I'd suggest
> either
> >>
> >> >>>
> >>
> >> >>> reading
> >>
> >> >>>
> >>
> >> >>>>    Kafka metadata of the Kafka cluster once upon initial
> splitting, or
> >>
> >> >>>
> >>
> >> >>> add
> >>
> >> >>>
> >>
> >> >>>>    some of it to the CheckpointMark. Filed BEAM-704
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> The original thread is called "Should UnboundedSource provide a
> split
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> identifier ?".
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> While the only specific implementation of UnboundedSource discussed
> >> here
> >>
> >> >>>
> >>
> >> >>> is
> >>
> >> >>>
> >>
> >> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
> >>
> >> >>>
> >>
> >> >>> Having
> >>
> >> >>>
> >>
> >> >>>> said that, I wonder where this meets PubSub ? or any other
> >>
> >> >>>
> >>
> >> >>> UnboundedSource
> >>
> >> >>>
> >>
> >> >>>> that those questions might affect.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> Thanks,
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> Amit
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> --
> >>
> >> >>>
> >>
> >> >>> Jean-Baptiste Onofré
> >>
> >> >>>
> >>
> >> >>> jbono...@apache.org
> >>
> >> >>>
> >>
> >> >>> http://blog.nanthrax.net
> >>
> >> >>>
> >>
> >> >>> Talend - http://www.talend.com
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>
> >>
> >> >
> >>
> >> > --
> >>
> >> > Jean-Baptiste Onofré
> >>
> >> > jbono...@apache.org
> >>
> >> > http://blog.nanthrax.net
> >>
> >> > Talend - http://www.talend.com
> >>
> >>
>

Reply via email to