A partitioning scheme should be a cluster wide thing. Letting each sink have a different partitioning scheme does not make sense to me. A partitioning scheme is not specific to a stream job, each task or a sink. I think specifying it at sink level is more error prone.
If a user wants to customize a partitioning scheme, he/she also want to manage it at some central place, maybe a code repo, or a jar file. All application must use the same logic, otherwise data will be messed up. Thus, a single class representing all partitioning logic is not a bad thing at all. (The code organization wise, all logic does not necessarily in the single class, of course.) On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch <rha...@gmail.com> wrote: > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > PR with the proposed change. > > Thanks! > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Thanks! > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch <rha...@gmail.com> wrote: > Ok, cool. I agree we want something simple. I'll create an issue and > create a pull request with a proposal. Look for it tomorrow. > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > I see your point. Yeah I think it is a good way to add a Partitioner into > addSink(...) but the Partitioner interface in producer is a bit overkill: > > "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] > valueBytes, Cluster cluster)" > > whereas for us we only want to partition on (K key, V value). > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > Guozhang > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com> wrote: > This overrides the partitioning logic for all topics, right? That means I > have to explicitly call the default partitioning logic for all topics > except those that my Producer forwards. I’m guess the best way to do by > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > with multiple sinks in my topology, I have to put all of the partitioning > logic inside a single class. > > What would you think about adding an overloaded TopologyBuilder.addSink(…) > method that takes a Partitioner (or better yet a smaller functional > interface). The resulting SinkProcessor could use that Partitioner instance > to set the partition number? That’d be super convenient for users, would > keep the logic where it belongs (where the topology defines the sinks), and > best of all the implementations won't have to worry about any other topics, > such as those used by stores, metrics, or other sinks. > > Best regards, > > Randall > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Hi Randall, > > You can try to set the partitioner class as > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface > can be found in > > org.apache.kafka.clients.producer.Partitioner > > Let me know if it works for you. > > Guozhang > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com> wrote: > > > The new streams API added with KIP-28 is great. I’ve been using it on a > > prototype for a few weeks, and I’m looking forward to it being included > in > > 0.9.0. However, at the moment, a Processor implementation is not able to > > specify the partition number when it outputs messages. > > > > I’d be happy to log a JIRA and create a PR to add it to the API, but > > without knowing all of the history I’m wondering if leaving it out of the > > API was intentional. > > > > Thoughts? > > > > Best regards, > > > > Randall Hauch > > > > > > -- > -- Guozhang > > > > -- > -- Guozhang > > > > -- > -- Guozhang >