A partitioning scheme should be a cluster wide thing. Letting each sink
have a different partitioning scheme does not make sense to me. A
partitioning scheme is not specific to a stream job, each task or a sink. I
think specifying it at sink level is more error prone.

If a user wants to customize a partitioning scheme, he/she also want to
manage it at some central place, maybe a code repo, or a jar file. All
application must use the same logic, otherwise data will be messed up.
Thus, a single class representing all partitioning logic is not a bad thing
at all. (The code organization wise, all logic does not necessarily in the
single class, of course.)


On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch <rha...@gmail.com> wrote:

> Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a
> PR with the proposed change.
>
> Thanks!
>
>
> On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)
> wrote:
>
> Thanks!
>
> On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch <rha...@gmail.com> wrote:
> Ok, cool. I agree we want something simple.  I'll create an issue and
> create a pull request with a proposal. Look for it tomorrow.
>
> On Oct 13, 2015, at 10:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> I see your point. Yeah I think it is a good way to add a Partitioner into
> addSink(...) but the Partitioner interface in producer is a bit overkill:
>
> "partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
> valueBytes, Cluster cluster)"
>
> whereas for us we only want to partition on (K key, V value).
>
> Perhaps we should add a new Partitioner interface in Kafka Streams?
>
> Guozhang
>
> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com> wrote:
> This overrides the partitioning logic for all topics, right? That means I
> have to explicitly call the default partitioning logic for all topics
> except those that my Producer forwards. I’m guess the best way to do by
> extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
> with multiple sinks in my topology, I have to put all of the partitioning
> logic inside a single class.
>
> What would you think about adding an overloaded TopologyBuilder.addSink(…)
> method that takes a Partitioner (or better yet a smaller functional
> interface). The resulting SinkProcessor could use that Partitioner instance
> to set the partition number? That’d be super convenient for users, would
> keep the logic where it belongs (where the topology defines the sinks), and
> best of all the implementations won't have to worry about any other topics,
> such as those used by stores, metrics, or other sinks.
>
> Best regards,
>
> Randall
>
>
> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)
> wrote:
>
> Hi Randall,
>
> You can try to set the partitioner class as
> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface
> can be found in
>
> org.apache.kafka.clients.producer.Partitioner
>
> Let me know if it works for you.
>
> Guozhang
>
> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com> wrote:
>
> > The new streams API added with KIP-28 is great. I’ve been using it on a
> > prototype for a few weeks, and I’m looking forward to it being included
> in
> > 0.9.0. However, at the moment, a Processor implementation is not able to
> > specify the partition number when it outputs messages.
> >
> > I’d be happy to log a JIRA and create a PR to add it to the API, but
> > without knowing all of the history I’m wondering if leaving it out of the
> > API was intentional.
> >
> > Thoughts?
> >
> > Best regards,
> >
> > Randall Hauch
> >
>
>
>
> --
> -- Guozhang
>
>
>
> --
> -- Guozhang
>
>
>
> --
> -- Guozhang
>

Reply via email to