Filed https://issues.apache.org/jira/browse/BEAM-727.
As for initializing from workers, this is actually the opposite of what's discussed in https://issues.apache.org/jira/browse/BEAM-704. IMO Kafka metadata should be accessed once, before splitting, since this metadata eventually determines the splitting (and for other reasons mentioned in BEAM-704). As for the "Driver/Launcher" program accessing Kafka, I guess it could run on a worker, right ? I can only speak for Spark saying that when running on YARN it can run in "cluster-mode" - running the driver program in a YARN container as well. On Tue, Oct 4, 2016 at 10:39 PM Raghu Angadi <rang...@google.com.invalid> wrote: > On Wed, Sep 14, 2016 at 1:43 PM, Amit Sela <amitsel...@gmail.com> wrote: > > > > > > > For generateInitialSplits, the UnboundedSource API doesn't require > > > deterministic splitting (although it's recommended), and a > PipelineRunner > > > should keep track of the initially generated splits. > > > > > If the splitting were to be consistent, in such way that newly added > > partitions would be assigned with a new "splitId" while existing ones > would > > still be assigned with the same (consistent) splitId, it could support > > newly added partitions, no ? > > > Yes, consistently assigning the partitions will let us do this. I wouldn't > hash though, it would not distribute partitions evenly when the number of > partitions is low (say 2 or 10, which is pretty common case). We can assign > consistently even with a round-robin assignment. Current assignment would > work, except when we are reading from multiple topics. We can update it to > handle multiple topics better (assign each partitions for each topic > independently). > > This strategy would still depend on strong guarantee on > generateInitialSplits() interface where 'desiredNumSplits' stays same > across updates. > > Please file a jira for adding support for handling change in Kafka > partitions. > > In fact, KafkaIO should probably not fetch partitions info form inside > generateIntialSplits() at all (sometimes Kafka cluster might not be > accessible from where the job is launched from) and instead do all the > initialization from the workers, even though it implies multiple fetches of > Kafka metadata. >