I think part of Yasu's motivation for cross-cluster partitioning is that, for example, there could be multiple stream jobs reading / writing to some shared topics but controlled by different teams or services inside an organization, and if one team mistakenly specifying the partitioning in a wrong way it will interfere with other teams, hence a global management of partitioning scheme may be required just like a global schema registry service for Kafka.
With the producer's Partitioner interface, we can still make different partitioning schemes for different topics in a single class: just switch-branch on the topic name and cast the key-value types, but that would be a bit awkward. So I am preferring to a customizable partitioner in the sink spec for better user programmability. Guozhang On Wed, Oct 14, 2015 at 1:03 PM, Randall Hauch <rha...@gmail.com> wrote: > It absolutely is important that the partitioning logic for a single topic > be the same across an entire cluster. IOW, if a topology has a single sink, > then no matter where that topology is run in the cluster, it had better use > the same partitioning logic. I would argue that when the partitioning logic > varies from the default logic, it’s far better to encapsulate it within the > topology’s definition, and adding it to the sink is a very easy way to do > this (and very natural for the developer using Kafka Streams). > > However, centralizing the partitioning logic for all streams is certainly > not ideal, primarily because different topics will likely need to be > partitioned in different ways. This is especially true for stateful stream > processing, which depends on messages with the same key going to the same > processor instance that owns that keyed data. IOW, the partitioning logic > used by a producer is strongly informed by how the *downstream stateful > consumers* are organized/clustered. It gets far more complicated when > considering built-in topics used by offset management, state storage, and > metrics. > > The bottom line is that *different* topics will likely need to be > partitioned differently. > > On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda ( > yasuhiro.mats...@gmail.com) wrote: > > A partitioning scheme should be a cluster wide thing. Letting each sink > have a different partitioning scheme does not make sense to me. A > partitioning scheme is not specific to a stream job, each task or a sink. > I > think specifying it at sink level is more error prone. > > If a user wants to customize a partitioning scheme, he/she also want to > manage it at some central place, maybe a code repo, or a jar file. All > application must use the same logic, otherwise data will be messed up. > Thus, a single class representing all partitioning logic is not a bad > thing > at all. (The code organization wise, all logic does not necessarily in the > single class, of course.) > > > On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch <rha...@gmail.com> wrote: > > > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > > PR with the proposed change. > > > > Thanks! > > > > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > > wrote: > > > > Thanks! > > > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch <rha...@gmail.com> > wrote: > > Ok, cool. I agree we want something simple. I'll create an issue and > > create a pull request with a proposal. Look for it tomorrow. > > > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > > I see your point. Yeah I think it is a good way to add a Partitioner > into > > addSink(...) but the Partitioner interface in producer is a bit > overkill: > > > > "partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] > > valueBytes, Cluster cluster)" > > > > whereas for us we only want to partition on (K key, V value). > > > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > > > Guozhang > > > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com> > wrote: > > This overrides the partitioning logic for all topics, right? That means > I > > have to explicitly call the default partitioning logic for all topics > > except those that my Producer forwards. I’m guess the best way to do by > > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > > with multiple sinks in my topology, I have to put all of the > partitioning > > logic inside a single class. > > > > What would you think about adding an overloaded > TopologyBuilder.addSink(…) > > method that takes a Partitioner (or better yet a smaller functional > > interface). The resulting SinkProcessor could use that Partitioner > instance > > to set the partition number? That’d be super convenient for users, would > > keep the logic where it belongs (where the topology defines the sinks), > and > > best of all the implementations won't have to worry about any other > topics, > > such as those used by stores, metrics, or other sinks. > > > > Best regards, > > > > Randall > > > > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > > wrote: > > > > Hi Randall, > > > > You can try to set the partitioner class as > > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its > interface > > can be found in > > > > org.apache.kafka.clients.producer.Partitioner > > > > Let me know if it works for you. > > > > Guozhang > > > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com> > wrote: > > > > > The new streams API added with KIP-28 is great. I’ve been using it on > a > > > prototype for a few weeks, and I’m looking forward to it being > included > > in > > > 0.9.0. However, at the moment, a Processor implementation is not able > to > > > specify the partition number when it outputs messages. > > > > > > I’d be happy to log a JIRA and create a PR to add it to the API, but > > > without knowing all of the history I’m wondering if leaving it out of > the > > > API was intentional. > > > > > > Thoughts? > > > > > > Best regards, > > > > > > Randall Hauch > > > > > > > > > > > -- > > -- Guozhang > > > > > > > > -- > > -- Guozhang > > > > > > > > -- > > -- Guozhang > > > > -- -- Guozhang