On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]> wrote:

> @Raghu, hashing <topic, partition> is exactly what I mean, but I'm asking
> if it should be abstracted in the Source.. Otherwise, runners will have to
> *is instance of* on every Source, and write their own hash implementation.
> Since splits contain the "splitted" Source, and it contains it's own
> CheckpointMark, and hashing would probably be tied to that CheckpointMark,
> why not abstract it in the UnboundedSource ?
>

I don't quite follow how a runner should be concerned about hashing used by
a Source for its own splits. Can you give a concrete example? To me it
looks like source and checkpoint objects are completely opaque to the
runners.


> On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi <[email protected]>
> wrote:
>
> > > If splits (UnboundedSources) had an identifier, this could be avoided,
> > and checkpoints could be persisted accordingly.
> >
> > The order of the splits that a source returns is preserved. So during an
> > update, you can expect 5th split gets invoked with the same checkpoint
> mark
> > that 5th split saved before upgrade. You can hash <topic, partition> to
> one
> > of the indices.
> >
> > KafkaIO.java doe not support change in partitions.
> >
> > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov <
> > [email protected]> wrote:
> >
> > > Oh! Okay, looks like this is a part of the code I was unfamiliar with.
> > I'd
> > > like to know the answer too, in this case.
> > > +Daniel Mills <[email protected]> can you comment ?
> > >
> > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <[email protected]> wrote:
> > >
> > > > That is correct, as long as non of the Kafka topics "grow" another
> > > > partition (which it could).
> > > > In that case, some bundle will have to start reading from this
> > partition
> > > as
> > > > well, and since all other bundles already have a "previous
> checkpoint"
> > it
> > > > matters which checkpoint to relate to. I don't know how it's
> > implemented
> > > in
> > > > Dataflow, but in Spark I'm testing using mapWithState to store the
> > > > checkpoints per split.
> > > > It also seems that order does matter to the KafkIO:
> > > >
> > > > https://github.com/apache/incubator-beam/blob/master/
> > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > > kafka/KafkaIO.java#L636
> > > >
> > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
> > > > <[email protected]> wrote:
> > > >
> > > > > Hi Amit,
> > > > > Could you explain more about why you're saying the order of splits
> > > > matters?
> > > > > AFAIK the semantics of Read.Unbounded is "read from all of the
> splits
> > > in
> > > > > parallel, checkpointing each of them independently", so their order
> > > > > shouldn't matter.
> > > > >
> > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <[email protected]>
> > wrote:
> > > > >
> > > > > > UnboundedSources generate initial splits, which are basically
> > splits
> > > of
> > > > > > them selves - for example, if an UnboundedKafkaSource is set to
> > read
> > > > from
> > > > > > topic T1 which is made of 2 partitions P1 and P2, it will
> > (optimally)
> > > > > split
> > > > > > into two UnboundedKafkaSource, one per partition.
> > > > > > During the lifecycle of the "reader" bundles, CheckpointMarks are
> > > used
> > > > to
> > > > > > checkpoint "last-read" and so readers may restart/resume. I'm
> > > assuming
> > > > > this
> > > > > > is how newly created partitions will automatically be added to
> > > readers.
> > > > > >
> > > > > > The problem is that it's all fine while there is only one topic
> > > (Kafka
> > > > > > topic-partition pairs are ordered), but if reading from more then
> > one
> > > > > topic
> > > > > > this may break:
> > > > > > T1,P1
> > > > > > T1,P2
> > > > > > T1,P3
> > > > > > T2,P1
> > > > > > The order is not maintained and T2,P1 is 4th now.
> > > > > >
> > > > > > If splits (UnboundedSources) had an identifier, this could be
> > > avoided,
> > > > > and
> > > > > > checkpoints could be persisted accordingly.
> > > > > > For example, an UnboundedKafkaSource could use the hash code of
> > it's
> > > > > > assigned topic-partition pairs.
> > > > > >
> > > > > > I don't know how relevant this is to other Sources, but I guess
> it
> > is
> > > > as
> > > > > > long as they may grow their partitions dynamically (though I
> might
> > be
> > > > > > completely wrong...) and I don't see much of a downside.
> > > > > >
> > > > > > Thoughts ?
> > > > > >
> > > > > > This is something that troubles me now while working on
> > > Read.Unbounded,
> > > > > and
> > > > > > from a quick look I saw that the FlinkRunner expects "stable"
> > > splitting
> > > > > as
> > > > > > well.. How does Dataflow handle this ?
> > > > > >
> > > > > > Thanks,
> > > > > > Amit
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to