On Wed, Sep 14, 2016 at 1:43 PM, Amit Sela <[email protected]> wrote:
> > > > For generateInitialSplits, the UnboundedSource API doesn't require > > deterministic splitting (although it's recommended), and a PipelineRunner > > should keep track of the initially generated splits. > > > If the splitting were to be consistent, in such way that newly added > partitions would be assigned with a new "splitId" while existing ones would > still be assigned with the same (consistent) splitId, it could support > newly added partitions, no ? Yes, consistently assigning the partitions will let us do this. I wouldn't hash though, it would not distribute partitions evenly when the number of partitions is low (say 2 or 10, which is pretty common case). We can assign consistently even with a round-robin assignment. Current assignment would work, except when we are reading from multiple topics. We can update it to handle multiple topics better (assign each partitions for each topic independently). This strategy would still depend on strong guarantee on generateInitialSplits() interface where 'desiredNumSplits' stays same across updates. Please file a jira for adding support for handling change in Kafka partitions. In fact, KafkaIO should probably not fetch partitions info form inside generateIntialSplits() at all (sometimes Kafka cluster might not be accessible from where the job is launched from) and instead do all the initialization from the workers, even though it implies multiple fetches of Kafka metadata.
