I think part of Yasu's motivation for cross-cluster partitioning is that,
for example, there could be multiple stream jobs reading / writing to some
shared topics but controlled by different teams or services inside an
organization, and if one team mistakenly specifying the partitioning in a
wrong way it will interfere with other teams, hence a global management of
partitioning scheme may be required just like a global schema registry
service for Kafka.

With the producer's Partitioner interface, we can still make different
partitioning schemes for different topics in a single class: just
switch-branch on the topic name and cast the key-value types, but that
would be a bit awkward. So I am preferring to a customizable partitioner in
the sink spec for better user programmability.

Guozhang

On Wed, Oct 14, 2015 at 1:03 PM, Randall Hauch <rha...@gmail.com> wrote:

> It absolutely is important that the partitioning logic for a single topic
> be the same across an entire cluster. IOW, if a topology has a single sink,
> then no matter where that topology is run in the cluster, it had better use
> the same partitioning logic. I would argue that when the partitioning logic
> varies from the default logic, it’s far better to encapsulate it within the
> topology’s definition, and adding it to the sink is a very easy way to do
> this (and very natural for the developer using Kafka Streams).
>
> However, centralizing the partitioning logic for all streams is certainly
> not ideal, primarily because different topics will likely need to be
> partitioned in different ways. This is especially true for stateful stream
> processing, which depends on messages with the same key going to the same
> processor instance that owns that keyed data. IOW, the partitioning logic
> used by a producer is strongly informed by how the *downstream stateful
> consumers* are organized/clustered. It gets far more complicated when
> considering built-in topics used by offset management, state storage, and
> metrics.
>
> The bottom line is that *different* topics will likely need to be
> partitioned differently.
>
> On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda (
> yasuhiro.mats...@gmail.com) wrote:
>
> A partitioning scheme should be a cluster wide thing. Letting each sink
> have a different partitioning scheme does not make sense to me. A
> partitioning scheme is not specific to a stream job, each task or a sink.
> I
> think specifying it at sink level is more error prone.
>
> If a user wants to customize a partitioning scheme, he/she also want to
> manage it at some central place, maybe a code repo, or a jar file. All
> application must use the same logic, otherwise data will be messed up.
> Thus, a single class representing all partitioning logic is not a bad
> thing
> at all. (The code organization wise, all logic does not necessarily in the
> single class, of course.)
>
>
> On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch <rha...@gmail.com> wrote:
>
> > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a
> > PR with the proposed change.
> >
> > Thanks!
> >
> >
> > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)
> > wrote:
> >
> > Thanks!
> >
> > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch <rha...@gmail.com>
> wrote:
> > Ok, cool. I agree we want something simple. I'll create an issue and
> > create a pull request with a proposal. Look for it tomorrow.
> >
> > On Oct 13, 2015, at 10:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > I see your point. Yeah I think it is a good way to add a Partitioner
> into
> > addSink(...) but the Partitioner interface in producer is a bit
> overkill:
> >
> > "partition(String topic, Object key, byte[] keyBytes, Object value,
> byte[]
> > valueBytes, Cluster cluster)"
> >
> > whereas for us we only want to partition on (K key, V value).
> >
> > Perhaps we should add a new Partitioner interface in Kafka Streams?
> >
> > Guozhang
> >
> > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com>
> wrote:
> > This overrides the partitioning logic for all topics, right? That means
> I
> > have to explicitly call the default partitioning logic for all topics
> > except those that my Producer forwards. I’m guess the best way to do by
> > extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
> > with multiple sinks in my topology, I have to put all of the
> partitioning
> > logic inside a single class.
> >
> > What would you think about adding an overloaded
> TopologyBuilder.addSink(…)
> > method that takes a Partitioner (or better yet a smaller functional
> > interface). The resulting SinkProcessor could use that Partitioner
> instance
> > to set the partition number? That’d be super convenient for users, would
> > keep the logic where it belongs (where the topology defines the sinks),
> and
> > best of all the implementations won't have to worry about any other
> topics,
> > such as those used by stores, metrics, or other sinks.
> >
> > Best regards,
> >
> > Randall
> >
> >
> > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)
> > wrote:
> >
> > Hi Randall,
> >
> > You can try to set the partitioner class as
> > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its
> interface
> > can be found in
> >
> > org.apache.kafka.clients.producer.Partitioner
> >
> > Let me know if it works for you.
> >
> > Guozhang
> >
> > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com>
> wrote:
> >
> > > The new streams API added with KIP-28 is great. I’ve been using it on
> a
> > > prototype for a few weeks, and I’m looking forward to it being
> included
> > in
> > > 0.9.0. However, at the moment, a Processor implementation is not able
> to
> > > specify the partition number when it outputs messages.
> > >
> > > I’d be happy to log a JIRA and create a PR to add it to the API, but
> > > without knowing all of the history I’m wondering if leaving it out of
> the
> > > API was intentional.
> > >
> > > Thoughts?
> > >
> > > Best regards,
> > >
> > > Randall Hauch
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>


-- 
-- Guozhang

Reply via email to