Thanks Max!

I'll try to explain Spark's stateful operators and how/why I used them with
UnboundedSource.

Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
Since updateStateByKey is bound to output the (updated) state itself - the
CheckpointMark in our case - we're left with mapWithState.
mapWithState provides a persistent, distributed "map-like", that is
partitioned according to the stream. This is indeed how I manage state
between micro-batches.
However, mapWithState (like any map) will give you a value (state)
corresponding to a specific key, so I use a running-id from the initial
splitting to identify the appropriate state.
I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
could do the same and save the split source with the CheckpointMark but
it'll still have to correspond to the same id, and since I had to wrap the
split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
added an id field and I'm hashing by that id.
I'll also add that the stateful operator can only be applied to a
(Pair)Stream and not to input operators so I'm actually generating a stream
of splits (the same ones for every micro-batch) and reading from within the
mappingFunction of the mapWithState.

It's not the simplest design, but given how Spark's persistent state and
InputDStream are designed comparing to the Beam model, I don't see another
way - though I'd be happy to hear one!

Pretty sure I've added this here but no harm in adding the link again: design
doc
<https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
and
a work-in-progress branch
<https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as
well.
The design doc also relates to how "pure" Spark works with Kafka, which I
think is interesting and very different from Flink/Dataflow.

Hope this helped clear things up a little, please keep on asking if
something is not clear yet.

Thanks,
Amit.

On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <m...@apache.org> wrote:

> Just to add a comment from the Flink side and its
>
> UnboundedSourceWrapper. We experienced the only way to guarantee
>
> deterministic splitting of the source, was to generate the splits upon
>
> creation of the source and then checkpoint the assignment during
>
> runtime. When restoring from a checkpoint, the same reader
>
> configuration is restored. It's not possible to change the splitting
>
> after the initial splitting has taken place. However, Flink will soon
>
> be able to repartition the operator state upon restart/rescaling of a
>
> job.
>
>
>
> Does Spark have a way to pass state of a previous mini batch to the
>
> current mini batch? If so, you could restore the last configuration
>
> and continue reading from the checkpointed offset. You just have to
>
> checkpoint before the mini batch ends.
>
>
>
> -Max
>
>
>
> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> > Hi Amit,
>
> >
>
> > thanks for the explanation.
>
> >
>
> > For 4, you are right, it's slightly different from DataXchange (related
> to
>
> > the elements in the PCollection). I think storing the "starting point"
> for a
>
> > reader makes sense.
>
> >
>
> > Regards
>
> > JB
>
> >
>
> >
>
> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>
> >>
>
> >> Inline, thanks JB!
>
> >>
>
> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <j...@nanthrax.net>
>
> >> wrote:
>
> >>
>
> >>> Hi Amit,
>
> >>>
>
> >>>
>
> >>>
>
> >>> For 1., the runner is responsible of the checkpoint storage (associated
>
> >>>
>
> >>> with the source). It's the way for the runner to retry and know the
>
> >>>
>
> >>> failed bundles.
>
> >>>
>
> >> True, this was a recap/summary of another, not-so-clear, thread.
>
> >>
>
> >>>
>
> >>>
>
> >>>
>
> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>
> >>>
>
> >>> that ? It sounds like what I proposed in the "Technical Vision"
> appendix
>
> >>>
>
> >>> document: there I proposed to introduce a DataXchange object that store
>
> >>>
>
> >>> some additional metadata (like offset) used by the runner. It would be
>
> >>>
>
> >>> the same with SDF as the tracker state should be persistent as well.
>
> >>>
>
> >> I think I was more focused on persisting the "starting point" for a
>
> >> reader,
>
> >> even if no records were read (yet), so that the next time the reader
>
> >> attempts to read it will pick of there. This has more to do with how the
>
> >> CheckpointMark handles this.
>
> >> I have to say that I'm not familiar with your DataXchange proposal, I
> will
>
> >> take a look though.
>
> >>
>
> >>>
>
> >>>
>
> >>>
>
> >>> Regards
>
> >>>
>
> >>> JB
>
> >>>
>
> >>>
>
> >>>
>
> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>
> >>>
>
> >>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>
> >>>
>
> >>>
>
> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
> best
>
> >>>
>
> >>> to
>
> >>>
>
> >>>> start over in a clear [DISCUSS] thread.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> When working on UnboundedSource support for the Spark runner, I've
>
> >>>> raised
>
> >>>
>
> >>>
>
> >>>> some questions, some were general-UnboundedSource, and others
>
> >>>
>
> >>>
>
> >>>> Kafka-specific.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> I'd like to recap them here, and maybe have a more productive and
>
> >>>
>
> >>>
>
> >>>> well-documented discussion for everyone.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>>    1. UnboundedSource id's - I assume any runner persists the
>
> >>>
>
> >>>
>
> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>
> >>>
>
> >>> how it
>
> >>>
>
> >>>>    matches the appropriate split (of the UnboundedSource) to it's
>
> >>>
>
> >>> previously
>
> >>>
>
> >>>>    persisted CheckpointMark in any specific worker ?
>
> >>>
>
> >>>
>
> >>>>    *Thomas Groh* mentioned that Source splits have to have an
>
> >>>
>
> >>>
>
> >>>> associated identifier,
>
> >>>
>
> >>>
>
> >>>>    and so the runner gets to tag splits however it pleases, so long as
>
> >>>
>
> >>>
>
> >>>>    those tags don't allow splits to bleed into each other.
>
> >>>
>
> >>>
>
> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>
> >>>
>
> >>> require
>
> >>>
>
> >>>>    consistent splitting if it were to "pick-up where it left",
> correct ?
>
> >>>
>
> >>> this
>
> >>>
>
> >>>>    is not mentioned as a requirement or a recommendation in
>
> >>>
>
> >>>
>
> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>
> >>>
>
> >>> issue ?
>
> >>>
>
> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>
> >>>
>
> >>>
>
> >>>>    partitions to readers in a round-robin manner.
>
> >>>
>
> >>>
>
> >>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>
> >>>
>
> >>>
>
> >>>>    require deterministic splitting (although it's recommended), a
>
> >>>
>
> >>>
>
> >>>>    PipelineRunner
>
> >>>
>
> >>>
>
> >>>>    should keep track of the initially generated splits.
>
> >>>
>
> >>>
>
> >>>>    3. Support reading of Kafka partitions that were added to topic/s
>
> >>>
>
> >>> while
>
> >>>
>
> >>>>    a Pipeline reads from them - BEAM-727
>
> >>>
>
> >>>
>
> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>
> >>>
>
> >>>
>
> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>
> >>>
>
> >>>
>
> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
> each
>
> >>>
>
> >>> worker
>
> >>>
>
> >>>>    would actually begin reading only after it saw a message during the
>
> >>>
>
> >>> time
>
> >>>
>
> >>>>    window it had to read messages. This is because fetching the
> offsets
>
> >>>
>
> >>> is
>
> >>>
>
> >>>>    done by the worker running the Reader. This means that each Reader
>
> >>>
>
> >>> sees a
>
> >>>
>
> >>>>    different state of "latest" (for his partition/s), such that a
>
> >>>> failing
>
> >>>
>
> >>>
>
> >>>>    Reader that hasn't read yet might fetch a different "latest" once
>
> >>>> it's
>
> >>>
>
> >>>
>
> >>>>    recovered then what it originally fetched. While this may not be as
>
> >>>
>
> >>> painful
>
> >>>
>
> >>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>
> >>>
>
> >>> reading
>
> >>>
>
> >>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>
> >>>
>
> >>> add
>
> >>>
>
> >>>>    some of it to the CheckpointMark. Filed BEAM-704
>
> >>>
>
> >>>
>
> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> The original thread is called "Should UnboundedSource provide a split
>
> >>>
>
> >>>
>
> >>>> identifier ?".
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> While the only specific implementation of UnboundedSource discussed
> here
>
> >>>
>
> >>> is
>
> >>>
>
> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
>
> >>>
>
> >>> Having
>
> >>>
>
> >>>> said that, I wonder where this meets PubSub ? or any other
>
> >>>
>
> >>> UnboundedSource
>
> >>>
>
> >>>> that those questions might affect.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> Thanks,
>
> >>>
>
> >>>
>
> >>>> Amit
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>
>
> >>>
>
> >>> --
>
> >>>
>
> >>> Jean-Baptiste Onofré
>
> >>>
>
> >>> jbono...@apache.org
>
> >>>
>
> >>> http://blog.nanthrax.net
>
> >>>
>
> >>> Talend - http://www.talend.com
>
> >>>
>
> >>>
>
> >>
>
> >
>
> > --
>
> > Jean-Baptiste Onofré
>
> > jbono...@apache.org
>
> > http://blog.nanthrax.net
>
> > Talend - http://www.talend.com
>
>

Reply via email to