> If splits (UnboundedSources) had an identifier, this could be avoided, and checkpoints could be persisted accordingly.
The order of the splits that a source returns is preserved. So during an update, you can expect 5th split gets invoked with the same checkpoint mark that 5th split saved before upgrade. You can hash <topic, partition> to one of the indices. KafkaIO.java doe not support change in partitions. On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov < [email protected]> wrote: > Oh! Okay, looks like this is a part of the code I was unfamiliar with. I'd > like to know the answer too, in this case. > +Daniel Mills <[email protected]> can you comment ? > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <[email protected]> wrote: > > > That is correct, as long as non of the Kafka topics "grow" another > > partition (which it could). > > In that case, some bundle will have to start reading from this partition > as > > well, and since all other bundles already have a "previous checkpoint" it > > matters which checkpoint to relate to. I don't know how it's implemented > in > > Dataflow, but in Spark I'm testing using mapWithState to store the > > checkpoints per split. > > It also seems that order does matter to the KafkIO: > > > > https://github.com/apache/incubator-beam/blob/master/ > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/ > kafka/KafkaIO.java#L636 > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov > > <[email protected]> wrote: > > > > > Hi Amit, > > > Could you explain more about why you're saying the order of splits > > matters? > > > AFAIK the semantics of Read.Unbounded is "read from all of the splits > in > > > parallel, checkpointing each of them independently", so their order > > > shouldn't matter. > > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <[email protected]> wrote: > > > > > > > UnboundedSources generate initial splits, which are basically splits > of > > > > them selves - for example, if an UnboundedKafkaSource is set to read > > from > > > > topic T1 which is made of 2 partitions P1 and P2, it will (optimally) > > > split > > > > into two UnboundedKafkaSource, one per partition. > > > > During the lifecycle of the "reader" bundles, CheckpointMarks are > used > > to > > > > checkpoint "last-read" and so readers may restart/resume. I'm > assuming > > > this > > > > is how newly created partitions will automatically be added to > readers. > > > > > > > > The problem is that it's all fine while there is only one topic > (Kafka > > > > topic-partition pairs are ordered), but if reading from more then one > > > topic > > > > this may break: > > > > T1,P1 > > > > T1,P2 > > > > T1,P3 > > > > T2,P1 > > > > The order is not maintained and T2,P1 is 4th now. > > > > > > > > If splits (UnboundedSources) had an identifier, this could be > avoided, > > > and > > > > checkpoints could be persisted accordingly. > > > > For example, an UnboundedKafkaSource could use the hash code of it's > > > > assigned topic-partition pairs. > > > > > > > > I don't know how relevant this is to other Sources, but I guess it is > > as > > > > long as they may grow their partitions dynamically (though I might be > > > > completely wrong...) and I don't see much of a downside. > > > > > > > > Thoughts ? > > > > > > > > This is something that troubles me now while working on > Read.Unbounded, > > > and > > > > from a quick look I saw that the FlinkRunner expects "stable" > splitting > > > as > > > > well.. How does Dataflow handle this ? > > > > > > > > Thanks, > > > > Amit > > > > > > > > > >
