Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a PR with 
the proposed change.

Thanks!


On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) wrote:

Thanks!

On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch <rha...@gmail.com> wrote:
Ok, cool. I agree we want something simple.  I'll create an issue and create a 
pull request with a proposal. Look for it tomorrow. 

On Oct 13, 2015, at 10:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:

I see your point. Yeah I think it is a good way to add a Partitioner into 
addSink(...) but the Partitioner interface in producer is a bit overkill:

"partition(String topic, Object key, byte[] keyBytes, Object value, byte[] 
valueBytes, Cluster cluster)"

whereas for us we only want to partition on (K key, V value).

Perhaps we should add a new Partitioner interface in Kafka Streams?

Guozhang

On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com> wrote:
This overrides the partitioning logic for all topics, right? That means I have 
to explicitly call the default partitioning logic for all topics except those 
that my Producer forwards. I’m guess the best way to do by extending 
org.apache.kafka.clients.producer.DefaultProducer. Of course, with multiple 
sinks in my topology, I have to put all of the partitioning logic inside a 
single class.

What would you think about adding an overloaded TopologyBuilder.addSink(…) 
method that takes a Partitioner (or better yet a smaller functional interface). 
The resulting SinkProcessor could use that Partitioner instance to set the 
partition number? That’d be super convenient for users, would keep the logic 
where it belongs (where the topology defines the sinks), and best of all the 
implementations won't have to worry about any other topics, such as those used 
by stores, metrics, or other sinks.

Best regards,

Randall


On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) wrote:

Hi Randall,

You can try to set the partitioner class as
ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface
can be found in

org.apache.kafka.clients.producer.Partitioner

Let me know if it works for you.

Guozhang

On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com> wrote:

> The new streams API added with KIP-28 is great. I’ve been using it on a
> prototype for a few weeks, and I’m looking forward to it being included in
> 0.9.0. However, at the moment, a Processor implementation is not able to
> specify the partition number when it outputs messages.
>
> I’d be happy to log a JIRA and create a PR to add it to the API, but
> without knowing all of the history I’m wondering if leaving it out of the
> API was intentional.
>
> Thoughts?
>
> Best regards,
>
> Randall Hauch
>



--
-- Guozhang



--
-- Guozhang



--
-- Guozhang

Reply via email to