On Wed, Sep 14, 2016 at 1:43 PM, Amit Sela <[email protected]> wrote:

> >
> > For generateInitialSplits, the UnboundedSource API doesn't require
> > deterministic splitting (although it's recommended), and a PipelineRunner
> > should keep track of the initially generated splits.
> >
> If the splitting were to be consistent, in such way that newly added
> partitions would be assigned with a new "splitId" while existing ones would
> still be assigned with the same (consistent) splitId, it could support
> newly added partitions, no ?


Yes, consistently assigning the partitions will let us do this. I wouldn't
hash though, it would not distribute partitions evenly when the number of
partitions is low (say 2 or 10, which is pretty common case). We can assign
consistently even with a round-robin assignment. Current assignment would
work, except when we are reading from multiple topics. We can update it to
handle multiple topics better (assign each partitions for each topic
independently).

This strategy would still depend on strong guarantee on
generateInitialSplits() interface where 'desiredNumSplits' stays same
across updates.

Please file a jira for adding support for handling change in Kafka
partitions.

In fact, KafkaIO should probably not fetch partitions info form inside
generateIntialSplits() at all (sometimes Kafka cluster might not be
accessible from where the job is launched from) and instead do all the
initialization from the workers, even though it implies multiple fetches of
Kafka metadata.

Reply via email to