@Raghu, hashing <topic, partition> is exactly what I mean, but I'm asking
if it should be abstracted in the Source.. Otherwise, runners will have to
*is instance of* on every Source, and write their own hash implementation.
Since splits contain the "splitted" Source, and it contains it's own
CheckpointMark, and hashing would probably be tied to that CheckpointMark,
why not abstract it in the UnboundedSource ?

On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi <[email protected]>
wrote:

> > If splits (UnboundedSources) had an identifier, this could be avoided,
> and checkpoints could be persisted accordingly.
>
> The order of the splits that a source returns is preserved. So during an
> update, you can expect 5th split gets invoked with the same checkpoint mark
> that 5th split saved before upgrade. You can hash <topic, partition> to one
> of the indices.
>
> KafkaIO.java doe not support change in partitions.
>
> On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov <
> [email protected]> wrote:
>
> > Oh! Okay, looks like this is a part of the code I was unfamiliar with.
> I'd
> > like to know the answer too, in this case.
> > +Daniel Mills <[email protected]> can you comment ?
> >
> > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <[email protected]> wrote:
> >
> > > That is correct, as long as non of the Kafka topics "grow" another
> > > partition (which it could).
> > > In that case, some bundle will have to start reading from this
> partition
> > as
> > > well, and since all other bundles already have a "previous checkpoint"
> it
> > > matters which checkpoint to relate to. I don't know how it's
> implemented
> > in
> > > Dataflow, but in Spark I'm testing using mapWithState to store the
> > > checkpoints per split.
> > > It also seems that order does matter to the KafkIO:
> > >
> > > https://github.com/apache/incubator-beam/blob/master/
> > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > kafka/KafkaIO.java#L636
> > >
> > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
> > > <[email protected]> wrote:
> > >
> > > > Hi Amit,
> > > > Could you explain more about why you're saying the order of splits
> > > matters?
> > > > AFAIK the semantics of Read.Unbounded is "read from all of the splits
> > in
> > > > parallel, checkpointing each of them independently", so their order
> > > > shouldn't matter.
> > > >
> > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <[email protected]>
> wrote:
> > > >
> > > > > UnboundedSources generate initial splits, which are basically
> splits
> > of
> > > > > them selves - for example, if an UnboundedKafkaSource is set to
> read
> > > from
> > > > > topic T1 which is made of 2 partitions P1 and P2, it will
> (optimally)
> > > > split
> > > > > into two UnboundedKafkaSource, one per partition.
> > > > > During the lifecycle of the "reader" bundles, CheckpointMarks are
> > used
> > > to
> > > > > checkpoint "last-read" and so readers may restart/resume. I'm
> > assuming
> > > > this
> > > > > is how newly created partitions will automatically be added to
> > readers.
> > > > >
> > > > > The problem is that it's all fine while there is only one topic
> > (Kafka
> > > > > topic-partition pairs are ordered), but if reading from more then
> one
> > > > topic
> > > > > this may break:
> > > > > T1,P1
> > > > > T1,P2
> > > > > T1,P3
> > > > > T2,P1
> > > > > The order is not maintained and T2,P1 is 4th now.
> > > > >
> > > > > If splits (UnboundedSources) had an identifier, this could be
> > avoided,
> > > > and
> > > > > checkpoints could be persisted accordingly.
> > > > > For example, an UnboundedKafkaSource could use the hash code of
> it's
> > > > > assigned topic-partition pairs.
> > > > >
> > > > > I don't know how relevant this is to other Sources, but I guess it
> is
> > > as
> > > > > long as they may grow their partitions dynamically (though I might
> be
> > > > > completely wrong...) and I don't see much of a downside.
> > > > >
> > > > > Thoughts ?
> > > > >
> > > > > This is something that troubles me now while working on
> > Read.Unbounded,
> > > > and
> > > > > from a quick look I saw that the FlinkRunner expects "stable"
> > splitting
> > > > as
> > > > > well.. How does Dataflow handle this ?
> > > > >
> > > > > Thanks,
> > > > > Amit
> > > > >
> > > >
> > >
> >
>

Reply via email to