UnboundedSources generate initial splits, which are basically splits of
them selves - for example, if an UnboundedKafkaSource is set to read from
topic T1 which is made of 2 partitions P1 and P2, it will (optimally) split
into two UnboundedKafkaSource, one per partition.
During the lifecycle of the "reader" bundles, CheckpointMarks are used to
checkpoint "last-read" and so readers may restart/resume. I'm assuming this
is how newly created partitions will automatically be added to readers.

The problem is that it's all fine while there is only one topic (Kafka
topic-partition pairs are ordered), but if reading from more then one topic
this may break:
T1,P1
T1,P2
T1,P3
T2,P1
The order is not maintained and T2,P1 is 4th now.

If splits (UnboundedSources) had an identifier, this could be avoided, and
checkpoints could be persisted accordingly.
For example, an UnboundedKafkaSource could use the hash code of it's
assigned topic-partition pairs.

I don't know how relevant this is to other Sources, but I guess it is as
long as they may grow their partitions dynamically (though I might be
completely wrong...) and I don't see much of a downside.

Thoughts ?

This is something that troubles me now while working on Read.Unbounded, and
from a quick look I saw that the FlinkRunner expects "stable" splitting as
well.. How does Dataflow handle this ?

Thanks,
Amit

Reply via email to