If this issue doesn't make sense for "native" streaming systems, and it's
only a Spark issue (and my implementation of Read.Unbounded) - I could keep
doing what I do, use a running id.
I was just wondering... ( hence the question mark in the title ;-) )

On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <[email protected]> wrote:

> Not sure how it works in Dataflow or Flink, but I'm working on an
> implementation for Spark using the (almost) only stateful operator it has -
> "mapWithState" - and the State needs to correspond to a key.
> Each micro-batch, the Sources recreate the readers and "look-up" the
> latest checkpoint.
>
> On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi <[email protected]>
> wrote:
>
>> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]> wrote:
>>
>> > @Raghu, hashing <topic, partition> is exactly what I mean, but I'm
>> asking
>> > if it should be abstracted in the Source.. Otherwise, runners will have
>> to
>> > *is instance of* on every Source, and write their own hash
>> implementation.
>> > Since splits contain the "splitted" Source, and it contains it's own
>> > CheckpointMark, and hashing would probably be tied to that
>> CheckpointMark,
>> > why not abstract it in the UnboundedSource ?
>> >
>>
>> I don't quite follow how a runner should be concerned about hashing used
>> by
>> a Source for its own splits. Can you give a concrete example? To me it
>> looks like source and checkpoint objects are completely opaque to the
>> runners.
>>
>>
>> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi <[email protected]
>> >
>> > wrote:
>> >
>> > > > If splits (UnboundedSources) had an identifier, this could be
>> avoided,
>> > > and checkpoints could be persisted accordingly.
>> > >
>> > > The order of the splits that a source returns is preserved. So during
>> an
>> > > update, you can expect 5th split gets invoked with the same checkpoint
>> > mark
>> > > that 5th split saved before upgrade. You can hash <topic, partition>
>> to
>> > one
>> > > of the indices.
>> > >
>> > > KafkaIO.java doe not support change in partitions.
>> > >
>> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov <
>> > > [email protected]> wrote:
>> > >
>> > > > Oh! Okay, looks like this is a part of the code I was unfamiliar
>> with.
>> > > I'd
>> > > > like to know the answer too, in this case.
>> > > > +Daniel Mills <[email protected]> can you comment ?
>> > > >
>> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <[email protected]>
>> wrote:
>> > > >
>> > > > > That is correct, as long as non of the Kafka topics "grow" another
>> > > > > partition (which it could).
>> > > > > In that case, some bundle will have to start reading from this
>> > > partition
>> > > > as
>> > > > > well, and since all other bundles already have a "previous
>> > checkpoint"
>> > > it
>> > > > > matters which checkpoint to relate to. I don't know how it's
>> > > implemented
>> > > > in
>> > > > > Dataflow, but in Spark I'm testing using mapWithState to store the
>> > > > > checkpoints per split.
>> > > > > It also seems that order does matter to the KafkIO:
>> > > > >
>> > > > > https://github.com/apache/incubator-beam/blob/master/
>> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
>> > > > kafka/KafkaIO.java#L636
>> > > > >
>> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
>> > > > > <[email protected]> wrote:
>> > > > >
>> > > > > > Hi Amit,
>> > > > > > Could you explain more about why you're saying the order of
>> splits
>> > > > > matters?
>> > > > > > AFAIK the semantics of Read.Unbounded is "read from all of the
>> > splits
>> > > > in
>> > > > > > parallel, checkpointing each of them independently", so their
>> order
>> > > > > > shouldn't matter.
>> > > > > >
>> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <[email protected]>
>> > > wrote:
>> > > > > >
>> > > > > > > UnboundedSources generate initial splits, which are basically
>> > > splits
>> > > > of
>> > > > > > > them selves - for example, if an UnboundedKafkaSource is set
>> to
>> > > read
>> > > > > from
>> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it will
>> > > (optimally)
>> > > > > > split
>> > > > > > > into two UnboundedKafkaSource, one per partition.
>> > > > > > > During the lifecycle of the "reader" bundles, CheckpointMarks
>> are
>> > > > used
>> > > > > to
>> > > > > > > checkpoint "last-read" and so readers may restart/resume. I'm
>> > > > assuming
>> > > > > > this
>> > > > > > > is how newly created partitions will automatically be added to
>> > > > readers.
>> > > > > > >
>> > > > > > > The problem is that it's all fine while there is only one
>> topic
>> > > > (Kafka
>> > > > > > > topic-partition pairs are ordered), but if reading from more
>> then
>> > > one
>> > > > > > topic
>> > > > > > > this may break:
>> > > > > > > T1,P1
>> > > > > > > T1,P2
>> > > > > > > T1,P3
>> > > > > > > T2,P1
>> > > > > > > The order is not maintained and T2,P1 is 4th now.
>> > > > > > >
>> > > > > > > If splits (UnboundedSources) had an identifier, this could be
>> > > > avoided,
>> > > > > > and
>> > > > > > > checkpoints could be persisted accordingly.
>> > > > > > > For example, an UnboundedKafkaSource could use the hash code
>> of
>> > > it's
>> > > > > > > assigned topic-partition pairs.
>> > > > > > >
>> > > > > > > I don't know how relevant this is to other Sources, but I
>> guess
>> > it
>> > > is
>> > > > > as
>> > > > > > > long as they may grow their partitions dynamically (though I
>> > might
>> > > be
>> > > > > > > completely wrong...) and I don't see much of a downside.
>> > > > > > >
>> > > > > > > Thoughts ?
>> > > > > > >
>> > > > > > > This is something that troubles me now while working on
>> > > > Read.Unbounded,
>> > > > > > and
>> > > > > > > from a quick look I saw that the FlinkRunner expects "stable"
>> > > > splitting
>> > > > > > as
>> > > > > > > well.. How does Dataflow handle this ?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Amit
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to