Filed https://issues.apache.org/jira/browse/BEAM-727.

As for initializing from workers, this is actually the opposite of what's
discussed in https://issues.apache.org/jira/browse/BEAM-704.
IMO Kafka metadata should be accessed once, before splitting, since this
metadata eventually determines the splitting (and for other reasons
mentioned in BEAM-704).
As for the "Driver/Launcher" program accessing Kafka, I guess it could run
on a worker, right ? I can only speak for Spark saying that when running on
YARN it can run in "cluster-mode" - running the driver program in a YARN
container as well.

On Tue, Oct 4, 2016 at 10:39 PM Raghu Angadi <rang...@google.com.invalid>
wrote:

> On Wed, Sep 14, 2016 at 1:43 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
> > >
> > > For generateInitialSplits, the UnboundedSource API doesn't require
> > > deterministic splitting (although it's recommended), and a
> PipelineRunner
> > > should keep track of the initially generated splits.
> > >
> > If the splitting were to be consistent, in such way that newly added
> > partitions would be assigned with a new "splitId" while existing ones
> would
> > still be assigned with the same (consistent) splitId, it could support
> > newly added partitions, no ?
>
>
> Yes, consistently assigning the partitions will let us do this. I wouldn't
> hash though, it would not distribute partitions evenly when the number of
> partitions is low (say 2 or 10, which is pretty common case). We can assign
> consistently even with a round-robin assignment. Current assignment would
> work, except when we are reading from multiple topics. We can update it to
> handle multiple topics better (assign each partitions for each topic
> independently).
>
> This strategy would still depend on strong guarantee on
> generateInitialSplits() interface where 'desiredNumSplits' stays same
> across updates.
>
> Please file a jira for adding support for handling change in Kafka
> partitions.
>
> In fact, KafkaIO should probably not fetch partitions info form inside
> generateIntialSplits() at all (sometimes Kafka cluster might not be
> accessible from where the job is launched from) and instead do all the
> initialization from the workers, even though it implies multiple fetches of
> Kafka metadata.
>

Reply via email to