Oh! Okay, looks like this is a part of the code I was unfamiliar with. I'd
like to know the answer too, in this case.
+Daniel Mills <mil...@google.com> can you comment ?

On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <amitsel...@gmail.com> wrote:

> That is correct, as long as non of the Kafka topics "grow" another
> partition (which it could).
> In that case, some bundle will have to start reading from this partition as
> well, and since all other bundles already have a "previous checkpoint" it
> matters which checkpoint to relate to. I don't know how it's implemented in
> Dataflow, but in Spark I'm testing using mapWithState to store the
> checkpoints per split.
> It also seems that order does matter to the KafkIO:
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L636
>
> On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
>
> > Hi Amit,
> > Could you explain more about why you're saying the order of splits
> matters?
> > AFAIK the semantics of Read.Unbounded is "read from all of the splits in
> > parallel, checkpointing each of them independently", so their order
> > shouldn't matter.
> >
> > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <amitsel...@gmail.com> wrote:
> >
> > > UnboundedSources generate initial splits, which are basically splits of
> > > them selves - for example, if an UnboundedKafkaSource is set to read
> from
> > > topic T1 which is made of 2 partitions P1 and P2, it will (optimally)
> > split
> > > into two UnboundedKafkaSource, one per partition.
> > > During the lifecycle of the "reader" bundles, CheckpointMarks are used
> to
> > > checkpoint "last-read" and so readers may restart/resume. I'm assuming
> > this
> > > is how newly created partitions will automatically be added to readers.
> > >
> > > The problem is that it's all fine while there is only one topic (Kafka
> > > topic-partition pairs are ordered), but if reading from more then one
> > topic
> > > this may break:
> > > T1,P1
> > > T1,P2
> > > T1,P3
> > > T2,P1
> > > The order is not maintained and T2,P1 is 4th now.
> > >
> > > If splits (UnboundedSources) had an identifier, this could be avoided,
> > and
> > > checkpoints could be persisted accordingly.
> > > For example, an UnboundedKafkaSource could use the hash code of it's
> > > assigned topic-partition pairs.
> > >
> > > I don't know how relevant this is to other Sources, but I guess it is
> as
> > > long as they may grow their partitions dynamically (though I might be
> > > completely wrong...) and I don't see much of a downside.
> > >
> > > Thoughts ?
> > >
> > > This is something that troubles me now while working on Read.Unbounded,
> > and
> > > from a quick look I saw that the FlinkRunner expects "stable" splitting
> > as
> > > well.. How does Dataflow handle this ?
> > >
> > > Thanks,
> > > Amit
> > >
> >
>

Reply via email to