Just looking to the future, have you given any thought on how well
this would work on https://s.apache.org/splittable-do-fn?

On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela <[email protected]> wrote:
> Thanks Max!
>
> I'll try to explain Spark's stateful operators and how/why I used them with
> UnboundedSource.
>
> Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> Since updateStateByKey is bound to output the (updated) state itself - the
> CheckpointMark in our case - we're left with mapWithState.
> mapWithState provides a persistent, distributed "map-like", that is
> partitioned according to the stream. This is indeed how I manage state
> between micro-batches.
> However, mapWithState (like any map) will give you a value (state)
> corresponding to a specific key, so I use a running-id from the initial
> splitting to identify the appropriate state.
> I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> could do the same and save the split source with the CheckpointMark but
> it'll still have to correspond to the same id, and since I had to wrap the
> split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
> added an id field and I'm hashing by that id.
> I'll also add that the stateful operator can only be applied to a
> (Pair)Stream and not to input operators so I'm actually generating a stream
> of splits (the same ones for every micro-batch) and reading from within the
> mappingFunction of the mapWithState.
>
> It's not the simplest design, but given how Spark's persistent state and
> InputDStream are designed comparing to the Beam model, I don't see another
> way - though I'd be happy to hear one!
>
> Pretty sure I've added this here but no harm in adding the link again: design
> doc
> <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
> and
> a work-in-progress branch
> <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
> mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as
> well.
> The design doc also relates to how "pure" Spark works with Kafka, which I
> think is interesting and very different from Flink/Dataflow.
>
> Hope this helped clear things up a little, please keep on asking if
> something is not clear yet.
>
> Thanks,
> Amit.
>
> On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <[email protected]> wrote:
>
>> Just to add a comment from the Flink side and its
>>
>> UnboundedSourceWrapper. We experienced the only way to guarantee
>>
>> deterministic splitting of the source, was to generate the splits upon
>>
>> creation of the source and then checkpoint the assignment during
>>
>> runtime. When restoring from a checkpoint, the same reader
>>
>> configuration is restored. It's not possible to change the splitting
>>
>> after the initial splitting has taken place. However, Flink will soon
>>
>> be able to repartition the operator state upon restart/rescaling of a
>>
>> job.
>>
>>
>>
>> Does Spark have a way to pass state of a previous mini batch to the
>>
>> current mini batch? If so, you could restore the last configuration
>>
>> and continue reading from the checkpointed offset. You just have to
>>
>> checkpoint before the mini batch ends.
>>
>>
>>
>> -Max
>>
>>
>>
>> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> > Hi Amit,
>>
>> >
>>
>> > thanks for the explanation.
>>
>> >
>>
>> > For 4, you are right, it's slightly different from DataXchange (related
>> to
>>
>> > the elements in the PCollection). I think storing the "starting point"
>> for a
>>
>> > reader makes sense.
>>
>> >
>>
>> > Regards
>>
>> > JB
>>
>> >
>>
>> >
>>
>> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> >>
>>
>> >> Inline, thanks JB!
>>
>> >>
>>
>> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <[email protected]>
>>
>> >> wrote:
>>
>> >>
>>
>> >>> Hi Amit,
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 1., the runner is responsible of the checkpoint storage (associated
>>
>> >>>
>>
>> >>> with the source). It's the way for the runner to retry and know the
>>
>> >>>
>>
>> >>> failed bundles.
>>
>> >>>
>>
>> >> True, this was a recap/summary of another, not-so-clear, thread.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>>
>> >>>
>>
>> >>> that ? It sounds like what I proposed in the "Technical Vision"
>> appendix
>>
>> >>>
>>
>> >>> document: there I proposed to introduce a DataXchange object that store
>>
>> >>>
>>
>> >>> some additional metadata (like offset) used by the runner. It would be
>>
>> >>>
>>
>> >>> the same with SDF as the tracker state should be persistent as well.
>>
>> >>>
>>
>> >> I think I was more focused on persisting the "starting point" for a
>>
>> >> reader,
>>
>> >> even if no records were read (yet), so that the next time the reader
>>
>> >> attempts to read it will pick of there. This has more to do with how the
>>
>> >> CheckpointMark handles this.
>>
>> >> I have to say that I'm not familiar with your DataXchange proposal, I
>> will
>>
>> >> take a look though.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> Regards
>>
>> >>>
>>
>> >>> JB
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>
>> >>>
>>
>> >>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>
>> >>>
>>
>> >>>
>>
>> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
>> best
>>
>> >>>
>>
>> >>> to
>>
>> >>>
>>
>> >>>> start over in a clear [DISCUSS] thread.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> When working on UnboundedSource support for the Spark runner, I've
>>
>> >>>> raised
>>
>> >>>
>>
>> >>>
>>
>> >>>> some questions, some were general-UnboundedSource, and others
>>
>> >>>
>>
>> >>>
>>
>> >>>> Kafka-specific.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> I'd like to recap them here, and maybe have a more productive and
>>
>> >>>
>>
>> >>>
>>
>> >>>> well-documented discussion for everyone.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>>    1. UnboundedSource id's - I assume any runner persists the
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>
>> >>>
>>
>> >>> how it
>>
>> >>>
>>
>> >>>>    matches the appropriate split (of the UnboundedSource) to it's
>>
>> >>>
>>
>> >>> previously
>>
>> >>>
>>
>> >>>>    persisted CheckpointMark in any specific worker ?
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* mentioned that Source splits have to have an
>>
>> >>>
>>
>> >>>
>>
>> >>>> associated identifier,
>>
>> >>>
>>
>> >>>
>>
>> >>>>    and so the runner gets to tag splits however it pleases, so long as
>>
>> >>>
>>
>> >>>
>>
>> >>>>    those tags don't allow splits to bleed into each other.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>>
>> >>>
>>
>> >>> require
>>
>> >>>
>>
>> >>>>    consistent splitting if it were to "pick-up where it left",
>> correct ?
>>
>> >>>
>>
>> >>> this
>>
>> >>>
>>
>> >>>>    is not mentioned as a requirement or a recommendation in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>
>> >>>
>>
>> >>> issue ?
>>
>> >>>
>>
>> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>
>> >>>
>>
>> >>>
>>
>> >>>>    partitions to readers in a round-robin manner.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>
>> >>>
>>
>> >>>
>>
>> >>>>    require deterministic splitting (although it's recommended), a
>>
>> >>>
>>
>> >>>
>>
>> >>>>    PipelineRunner
>>
>> >>>
>>
>> >>>
>>
>> >>>>    should keep track of the initially generated splits.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    3. Support reading of Kafka partitions that were added to topic/s
>>
>> >>>
>>
>> >>> while
>>
>> >>>
>>
>> >>>>    a Pipeline reads from them - BEAM-727
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
>> each
>>
>> >>>
>>
>> >>> worker
>>
>> >>>
>>
>> >>>>    would actually begin reading only after it saw a message during the
>>
>> >>>
>>
>> >>> time
>>
>> >>>
>>
>> >>>>    window it had to read messages. This is because fetching the
>> offsets
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>>    done by the worker running the Reader. This means that each Reader
>>
>> >>>
>>
>> >>> sees a
>>
>> >>>
>>
>> >>>>    different state of "latest" (for his partition/s), such that a
>>
>> >>>> failing
>>
>> >>>
>>
>> >>>
>>
>> >>>>    Reader that hasn't read yet might fetch a different "latest" once
>>
>> >>>> it's
>>
>> >>>
>>
>> >>>
>>
>> >>>>    recovered then what it originally fetched. While this may not be as
>>
>> >>>
>>
>> >>> painful
>>
>> >>>
>>
>> >>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>>
>> >>>
>>
>> >>> reading
>>
>> >>>
>>
>> >>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>>
>> >>>
>>
>> >>> add
>>
>> >>>
>>
>> >>>>    some of it to the CheckpointMark. Filed BEAM-704
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> The original thread is called "Should UnboundedSource provide a split
>>
>> >>>
>>
>> >>>
>>
>> >>>> identifier ?".
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> While the only specific implementation of UnboundedSource discussed
>> here
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
>>
>> >>>
>>
>> >>> Having
>>
>> >>>
>>
>> >>>> said that, I wonder where this meets PubSub ? or any other
>>
>> >>>
>>
>> >>> UnboundedSource
>>
>> >>>
>>
>> >>>> that those questions might affect.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> Thanks,
>>
>> >>>
>>
>> >>>
>>
>> >>>> Amit
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> --
>>
>> >>>
>>
>> >>> Jean-Baptiste Onofré
>>
>> >>>
>>
>> >>> [email protected]
>>
>> >>>
>>
>> >>> http://blog.nanthrax.net
>>
>> >>>
>>
>> >>> Talend - http://www.talend.com
>>
>> >>>
>>
>> >>>
>>
>> >>
>>
>> >
>>
>> > --
>>
>> > Jean-Baptiste Onofré
>>
>> > [email protected]
>>
>> > http://blog.nanthrax.net
>>
>> > Talend - http://www.talend.com
>>
>>

Reply via email to