For Kafka, I don't think you're over-splitting if you split according to
Kafka partitions.

If your backend provides enough parallelism, you'll get a 1-1 (Source
splits-to-Kafka partitions) parallelism from the KafkaIO today.
The problem is with the backend not providing enough parallelism:

   - Current implementation: a split is assigned with more than 1 Kafka
   partition to read from, and since a split-read-task (how's it called ?) is
   executed by a single thread (just made an assumption which might not be
   true to all runners..), you'll "round-robin" on assigned Kafka splits.
   - "Over splitting": Each split-read-task is reading of exactly 1 Kafka
   partition, but there are queued split-read-tasks.

Does this make sense ?

Looking at it this way, which is better is probably runner-dependent.. in
terms of performance, I don't think it would matter for Spark but the
over-splitting shouldn't degrade performance and it can handle dynamically
growing Kafka topics.


On Mon, Nov 14, 2016 at 8:14 AM Davor Bonaci <[email protected]>
wrote:

> Luke is bringing up great questions, I think.
>
> My first impression is that the current state is "possibly under-split",
> and the proposal is to move us to "possibly over-split" state. Neither is
> the ideal solution, as I'm sure we can find scenarios when either is not
> performing well. That said, if we aren't really solving the problem (at
> this time), I can believe that "over-split" is better than "under-split".
>
> (Job update is not a Beam consideration at this time, so none of that
> applies.)
>
> Davor
>
> On Fri, Nov 11, 2016 at 7:55 AM, Lukasz Cwik <[email protected]>
> wrote:
>
> > Why is it that we don't generate initial splits after the pipeline has
> been
> > created and the runner is processing it?
> >
> > This would allow a runner to look at the old state of the pipeline and
> see
> > how many splits there were.
> > This would allow the runner to provide a hint as to how many splits it
> > wants.
> > This brings it inline with how bounded sources work where the splitting
> is
> > performed once the pipeline has started.
> >
> > On Fri, Nov 11, 2016 at 8:09 AM, Amit Sela <[email protected]> wrote:
> >
> > > +1
> > > I think this makes more sense then the existing form of a split that is
> > > made of several Kafka partitions since, as mentioned, Kafka partitions
> > are
> > > in fact it's parallelism.
> > >
> > > As for supporting a change in the number of partitions (mainly, added
> > > partitions), I'll suggest something I brought up before, and might make
> > > more sense now:
> > > Hashing an UnboundedSource according to it's split's properties
> > > (topic-partition in this case). This will allow to key the stream by
> the
> > > source in a way that the reader's CheckpointMark is tied to the split,
> > and
> > > if a "new split" is created (a new partition added to a topic the
> > pipeline
> > > consumes) it's reader's state is non-existing (starting from
> > > latest/earlies), while the rest (of the readers) will pick-up where
> they
> > > left.
> > > I think this also avoids the need to "remember" the original number of
> > > parallelism.
> > >
> > > Thanks,
> > > Amit
> > >
> > > On Fri, Nov 11, 2016 at 4:22 AM Raghu Angadi
> <[email protected]
> > >
> > > wrote:
> > >
> > > > I would like to propose a change to how many splits (sources) KafkaIO
> > > > creates. The code changes are relatively simple, but it has a couple
> of
> > > > drawbacks I would to discuss here.
> > > >
> > > > KafkaIO currently takes '*desiredNumWorkers
> > > > <
> > > > https://github.com/apache/incubator-beam/blob/v0.3.0-
> > > incubating-RC1/sdks/java/io/kafka/src/main/java/org/
> > > apache/beam/sdk/io/kafka/KafkaIO.java#L642
> > > > >*'
> > > > hint literally and returns exactly that many splits. If
> > > *desiredNumWorkers*
> > > > is 10, and the topic has 50 partitions, each Kafka source reads from
> 5
> > > > partitions.
> > > >
> > > > The primary disadvantage is that runner dependent 'desiredNumWorkers'
> > > might
> > > > not be accurate. In Dataflow, it is particularly low when we set
> > > > 'maxNumWorkers' (BEAM-958 <https://issues.apache.org/
> > > jira/browse/BEAM-958
> > > > >).
> > > > In addition, number of partitions in Kafka is a really good indicator
> > of
> > > > its parallelism.
> > > >
> > > > I would like to change KafkaIO to return one split for each of the
> > > > partitions.
> > > >
> > > > Pros:
> > > >
> > > >    - A partition is in fact the unit of parallelism in Kafka.
> > > >    - Does not depend on 'desiredNumWorkers'.
> > > >    - Little risk of having unreasonably large number of partitions
> > > (unlike
> > > >    say a source with one split for file). Number of partitions tend
> to
> > be
> > > > on
> > > >    the order of the Kafka cluster size.
> > > >
> > > > Cons: mainly affects job update:
> > > >
> > > >    - Breaks updating existing job
> > > >    <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline>
> > if
> > > it
> > > >    is updated to newer version of KafkaIO. New version changes number
> > of
> > > >    splits returned, which is not allowed during update.
> > > >       - I think this is a reasonable breakage at this stage.
> > > >       - Vast majority of updates don't involve version change
> > > >       - We could add a work around where user can explicitly set
> number
> > > of
> > > >       splits in KafkaIO (this might be required to handle change in
> > > > partitions as
> > > >       well, see below)
> > > >    - Makes it a bit more difficult to support change in number of
> Kafka
> > > >    partitions across an update.
> > > >       - This is not a feature in KafkaIO yet. So not a new breakage.
> > > >       - If we don't depend on 'desiredNumWorkers', there is no way
> for
> > us
> > > >       to know how many splits we had before the update. This is
> > actually
> > > a
> > > >       limitation of UnboundedSource API. UnboundedSource needs
> > > > multiple teaks to
> > > >       support job update better. In that sense I don't think this
> > should
> > > > be a
> > > >       blocker.
> > > >       - A work around is to let user explicitly set number of splits.
> > > E.g.
> > > >          - when a job starts, say we had 70 partitions and after some
> > > time
> > > >          we add 10 more partitions.
> > > >          - At runtime, each Kafka split notices these and can
> > distribute
> > > >          new partitions among existing 70.
> > > >          - But when the job is updated, KafkaIO does not know that it
> > had
> > > >          only 70 partitions earlier.
> > > >          - For this to work, user could set number of splits to 70
> > > >          explicitly.
> > > >
> > > >
> > > > Thanks.
> > > > Raghu.
> > > >
> > >
> >
>

Reply via email to