Thanks for the explanation, Amit!

What you described doesn't sound so different from how the Flink
Runner interfaces with the UnboundedSource interface. Taken aside the
mini batches and the discretization of the stream that you need to
apply therefore, the checkpointing logic is pretty similar. The Flink
wrapper doesn't use an id to identify the checkpointed state because
the state is kept per operator and restored to each instance in case
of a failure. In Spark, the state is directly scoped by key. That
actually makes a lot of sense when you want to rescale a job and
that's the direction in which Flink is currently improving its state
interface.


-Max


On Mon, Oct 10, 2016 at 3:35 PM, Amit Sela <amitsel...@gmail.com> wrote:
> Thanks Max!
>
> I'll try to explain Spark's stateful operators and how/why I used them with
> UnboundedSource.
>
> Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> Since updateStateByKey is bound to output the (updated) state itself - the
> CheckpointMark in our case - we're left with mapWithState.
> mapWithState provides a persistent, distributed "map-like", that is
> partitioned according to the stream. This is indeed how I manage state
> between micro-batches.
> However, mapWithState (like any map) will give you a value (state)
> corresponding to a specific key, so I use a running-id from the initial
> splitting to identify the appropriate state.
> I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> could do the same and save the split source with the CheckpointMark but
> it'll still have to correspond to the same id, and since I had to wrap the
> split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
> added an id field and I'm hashing by that id.
> I'll also add that the stateful operator can only be applied to a
> (Pair)Stream and not to input operators so I'm actually generating a stream
> of splits (the same ones for every micro-batch) and reading from within the
> mappingFunction of the mapWithState.
>
> It's not the simplest design, but given how Spark's persistent state and
> InputDStream are designed comparing to the Beam model, I don't see another
> way - though I'd be happy to hear one!
>
> Pretty sure I've added this here but no harm in adding the link again: design
> doc
> <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
> and
> a work-in-progress branch
> <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
> mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as
> well.
> The design doc also relates to how "pure" Spark works with Kafka, which I
> think is interesting and very different from Flink/Dataflow.
>
> Hope this helped clear things up a little, please keep on asking if
> something is not clear yet.
>
> Thanks,
> Amit.
>
> On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <m...@apache.org> wrote:
>
>> Just to add a comment from the Flink side and its
>>
>> UnboundedSourceWrapper. We experienced the only way to guarantee
>>
>> deterministic splitting of the source, was to generate the splits upon
>>
>> creation of the source and then checkpoint the assignment during
>>
>> runtime. When restoring from a checkpoint, the same reader
>>
>> configuration is restored. It's not possible to change the splitting
>>
>> after the initial splitting has taken place. However, Flink will soon
>>
>> be able to repartition the operator state upon restart/rescaling of a
>>
>> job.
>>
>>
>>
>> Does Spark have a way to pass state of a previous mini batch to the
>>
>> current mini batch? If so, you could restore the last configuration
>>
>> and continue reading from the checkpointed offset. You just have to
>>
>> checkpoint before the mini batch ends.
>>
>>
>>
>> -Max
>>
>>
>>
>> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>> wrote:
>>
>> > Hi Amit,
>>
>> >
>>
>> > thanks for the explanation.
>>
>> >
>>
>> > For 4, you are right, it's slightly different from DataXchange (related
>> to
>>
>> > the elements in the PCollection). I think storing the "starting point"
>> for a
>>
>> > reader makes sense.
>>
>> >
>>
>> > Regards
>>
>> > JB
>>
>> >
>>
>> >
>>
>> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> >>
>>
>> >> Inline, thanks JB!
>>
>> >>
>>
>> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <j...@nanthrax.net>
>>
>> >> wrote:
>>
>> >>
>>
>> >>> Hi Amit,
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 1., the runner is responsible of the checkpoint storage (associated
>>
>> >>>
>>
>> >>> with the source). It's the way for the runner to retry and know the
>>
>> >>>
>>
>> >>> failed bundles.
>>
>> >>>
>>
>> >> True, this was a recap/summary of another, not-so-clear, thread.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>>
>> >>>
>>
>> >>> that ? It sounds like what I proposed in the "Technical Vision"
>> appendix
>>
>> >>>
>>
>> >>> document: there I proposed to introduce a DataXchange object that store
>>
>> >>>
>>
>> >>> some additional metadata (like offset) used by the runner. It would be
>>
>> >>>
>>
>> >>> the same with SDF as the tracker state should be persistent as well.
>>
>> >>>
>>
>> >> I think I was more focused on persisting the "starting point" for a
>>
>> >> reader,
>>
>> >> even if no records were read (yet), so that the next time the reader
>>
>> >> attempts to read it will pick of there. This has more to do with how the
>>
>> >> CheckpointMark handles this.
>>
>> >> I have to say that I'm not familiar with your DataXchange proposal, I
>> will
>>
>> >> take a look though.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> Regards
>>
>> >>>
>>
>> >>> JB
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>
>> >>>
>>
>> >>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>
>> >>>
>>
>> >>>
>>
>> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
>> best
>>
>> >>>
>>
>> >>> to
>>
>> >>>
>>
>> >>>> start over in a clear [DISCUSS] thread.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> When working on UnboundedSource support for the Spark runner, I've
>>
>> >>>> raised
>>
>> >>>
>>
>> >>>
>>
>> >>>> some questions, some were general-UnboundedSource, and others
>>
>> >>>
>>
>> >>>
>>
>> >>>> Kafka-specific.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> I'd like to recap them here, and maybe have a more productive and
>>
>> >>>
>>
>> >>>
>>
>> >>>> well-documented discussion for everyone.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>>    1. UnboundedSource id's - I assume any runner persists the
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>
>> >>>
>>
>> >>> how it
>>
>> >>>
>>
>> >>>>    matches the appropriate split (of the UnboundedSource) to it's
>>
>> >>>
>>
>> >>> previously
>>
>> >>>
>>
>> >>>>    persisted CheckpointMark in any specific worker ?
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* mentioned that Source splits have to have an
>>
>> >>>
>>
>> >>>
>>
>> >>>> associated identifier,
>>
>> >>>
>>
>> >>>
>>
>> >>>>    and so the runner gets to tag splits however it pleases, so long as
>>
>> >>>
>>
>> >>>
>>
>> >>>>    those tags don't allow splits to bleed into each other.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>>
>> >>>
>>
>> >>> require
>>
>> >>>
>>
>> >>>>    consistent splitting if it were to "pick-up where it left",
>> correct ?
>>
>> >>>
>>
>> >>> this
>>
>> >>>
>>
>> >>>>    is not mentioned as a requirement or a recommendation in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>
>> >>>
>>
>> >>> issue ?
>>
>> >>>
>>
>> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>
>> >>>
>>
>> >>>
>>
>> >>>>    partitions to readers in a round-robin manner.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>
>> >>>
>>
>> >>>
>>
>> >>>>    require deterministic splitting (although it's recommended), a
>>
>> >>>
>>
>> >>>
>>
>> >>>>    PipelineRunner
>>
>> >>>
>>
>> >>>
>>
>> >>>>    should keep track of the initially generated splits.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    3. Support reading of Kafka partitions that were added to topic/s
>>
>> >>>
>>
>> >>> while
>>
>> >>>
>>
>> >>>>    a Pipeline reads from them - BEAM-727
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
>> each
>>
>> >>>
>>
>> >>> worker
>>
>> >>>
>>
>> >>>>    would actually begin reading only after it saw a message during the
>>
>> >>>
>>
>> >>> time
>>
>> >>>
>>
>> >>>>    window it had to read messages. This is because fetching the
>> offsets
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>>    done by the worker running the Reader. This means that each Reader
>>
>> >>>
>>
>> >>> sees a
>>
>> >>>
>>
>> >>>>    different state of "latest" (for his partition/s), such that a
>>
>> >>>> failing
>>
>> >>>
>>
>> >>>
>>
>> >>>>    Reader that hasn't read yet might fetch a different "latest" once
>>
>> >>>> it's
>>
>> >>>
>>
>> >>>
>>
>> >>>>    recovered then what it originally fetched. While this may not be as
>>
>> >>>
>>
>> >>> painful
>>
>> >>>
>>
>> >>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>>
>> >>>
>>
>> >>> reading
>>
>> >>>
>>
>> >>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>>
>> >>>
>>
>> >>> add
>>
>> >>>
>>
>> >>>>    some of it to the CheckpointMark. Filed BEAM-704
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> The original thread is called "Should UnboundedSource provide a split
>>
>> >>>
>>
>> >>>
>>
>> >>>> identifier ?".
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> While the only specific implementation of UnboundedSource discussed
>> here
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
>>
>> >>>
>>
>> >>> Having
>>
>> >>>
>>
>> >>>> said that, I wonder where this meets PubSub ? or any other
>>
>> >>>
>>
>> >>> UnboundedSource
>>
>> >>>
>>
>> >>>> that those questions might affect.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> Thanks,
>>
>> >>>
>>
>> >>>
>>
>> >>>> Amit
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> --
>>
>> >>>
>>
>> >>> Jean-Baptiste Onofré
>>
>> >>>
>>
>> >>> jbono...@apache.org
>>
>> >>>
>>
>> >>> http://blog.nanthrax.net
>>
>> >>>
>>
>> >>> Talend - http://www.talend.com
>>
>> >>>
>>
>> >>>
>>
>> >>
>>
>> >
>>
>> > --
>>
>> > Jean-Baptiste Onofré
>>
>> > jbono...@apache.org
>>
>> > http://blog.nanthrax.net
>>
>> > Talend - http://www.talend.com
>>
>>

Reply via email to