Luke is bringing up great questions, I think. My first impression is that the current state is "possibly under-split", and the proposal is to move us to "possibly over-split" state. Neither is the ideal solution, as I'm sure we can find scenarios when either is not performing well. That said, if we aren't really solving the problem (at this time), I can believe that "over-split" is better than "under-split".
(Job update is not a Beam consideration at this time, so none of that applies.) Davor On Fri, Nov 11, 2016 at 7:55 AM, Lukasz Cwik <[email protected]> wrote: > Why is it that we don't generate initial splits after the pipeline has been > created and the runner is processing it? > > This would allow a runner to look at the old state of the pipeline and see > how many splits there were. > This would allow the runner to provide a hint as to how many splits it > wants. > This brings it inline with how bounded sources work where the splitting is > performed once the pipeline has started. > > On Fri, Nov 11, 2016 at 8:09 AM, Amit Sela <[email protected]> wrote: > > > +1 > > I think this makes more sense then the existing form of a split that is > > made of several Kafka partitions since, as mentioned, Kafka partitions > are > > in fact it's parallelism. > > > > As for supporting a change in the number of partitions (mainly, added > > partitions), I'll suggest something I brought up before, and might make > > more sense now: > > Hashing an UnboundedSource according to it's split's properties > > (topic-partition in this case). This will allow to key the stream by the > > source in a way that the reader's CheckpointMark is tied to the split, > and > > if a "new split" is created (a new partition added to a topic the > pipeline > > consumes) it's reader's state is non-existing (starting from > > latest/earlies), while the rest (of the readers) will pick-up where they > > left. > > I think this also avoids the need to "remember" the original number of > > parallelism. > > > > Thanks, > > Amit > > > > On Fri, Nov 11, 2016 at 4:22 AM Raghu Angadi <[email protected] > > > > wrote: > > > > > I would like to propose a change to how many splits (sources) KafkaIO > > > creates. The code changes are relatively simple, but it has a couple of > > > drawbacks I would to discuss here. > > > > > > KafkaIO currently takes '*desiredNumWorkers > > > < > > > https://github.com/apache/incubator-beam/blob/v0.3.0- > > incubating-RC1/sdks/java/io/kafka/src/main/java/org/ > > apache/beam/sdk/io/kafka/KafkaIO.java#L642 > > > >*' > > > hint literally and returns exactly that many splits. If > > *desiredNumWorkers* > > > is 10, and the topic has 50 partitions, each Kafka source reads from 5 > > > partitions. > > > > > > The primary disadvantage is that runner dependent 'desiredNumWorkers' > > might > > > not be accurate. In Dataflow, it is particularly low when we set > > > 'maxNumWorkers' (BEAM-958 <https://issues.apache.org/ > > jira/browse/BEAM-958 > > > >). > > > In addition, number of partitions in Kafka is a really good indicator > of > > > its parallelism. > > > > > > I would like to change KafkaIO to return one split for each of the > > > partitions. > > > > > > Pros: > > > > > > - A partition is in fact the unit of parallelism in Kafka. > > > - Does not depend on 'desiredNumWorkers'. > > > - Little risk of having unreasonably large number of partitions > > (unlike > > > say a source with one split for file). Number of partitions tend to > be > > > on > > > the order of the Kafka cluster size. > > > > > > Cons: mainly affects job update: > > > > > > - Breaks updating existing job > > > <https://cloud.google.com/dataflow/pipelines/updating-a-pipeline> > if > > it > > > is updated to newer version of KafkaIO. New version changes number > of > > > splits returned, which is not allowed during update. > > > - I think this is a reasonable breakage at this stage. > > > - Vast majority of updates don't involve version change > > > - We could add a work around where user can explicitly set number > > of > > > splits in KafkaIO (this might be required to handle change in > > > partitions as > > > well, see below) > > > - Makes it a bit more difficult to support change in number of Kafka > > > partitions across an update. > > > - This is not a feature in KafkaIO yet. So not a new breakage. > > > - If we don't depend on 'desiredNumWorkers', there is no way for > us > > > to know how many splits we had before the update. This is > actually > > a > > > limitation of UnboundedSource API. UnboundedSource needs > > > multiple teaks to > > > support job update better. In that sense I don't think this > should > > > be a > > > blocker. > > > - A work around is to let user explicitly set number of splits. > > E.g. > > > - when a job starts, say we had 70 partitions and after some > > time > > > we add 10 more partitions. > > > - At runtime, each Kafka split notices these and can > distribute > > > new partitions among existing 70. > > > - But when the job is updated, KafkaIO does not know that it > had > > > only 70 partitions earlier. > > > - For this to work, user could set number of splits to 70 > > > explicitly. > > > > > > > > > Thanks. > > > Raghu. > > > > > >
