Hi All,


This might be too late about partitioning strategy and use cases to cover.


I have had experience with both producer and consumer side.  I have
different  use case on this partition selection strategy.



Problem:

We have heterogeneous environment of producers (by that I mean we have node
js, python, New Java & Old Scala Based producers to same topic).   I have
seen that not all producers employ round-robing strategies for non-keyed
message like new producer does.  Hence, it creates non uniform data
ingestion into partition and delay in overall message processing.

How to address uniform distribution/message injection rate to all
partitions ?



Propose Solution:


Let broker cluster decide the next partition for topic to send data rather
than producer itself with more intelligence.

1)   When sending data to brokers (ProduceResponse) Kafka Protocol over the
wire send hint to client which partition to send based on following logic
(Or can be customizable)

a.     Based on overall data injection rate for topic and current producer
injection rate

b.     Ability rank partition based on consumer rate (Advance Use Case as
there may be many consumers so weighted average etc... )



Untimely, brokers will coordinate among thousand of producers and divert
data injection  rate (out-of-box feature) and consumption rate (pluggable
interface implementation on brokers’ side).  The goal  here is to attain
uniformity and/or lower delivery rate to consumer.  This is similar to
consumer coordination moving to brokers. The producer side partition
selection would also move to brokers.  This will benefit both java and
non-java clients.



Please let me know feedback on this subject.





Thanks,


Bhavesh

On Mon, May 18, 2015 at 7:25 AM, Sriharsha Chintalapani <
harsh...@fastmail.fm> wrote:

> Gianmarco,
>             I’ll send the patch soon.
> Thanks,
> Harsha
>
>
> On May 18, 2015 at 1:34:50 AM, Gianmarco De Francisci Morales (
> g...@apache.org) wrote:
>
> Hi,
>
> If everything is in order, can we proceed to implement it?
>
> Cheers,
>
> --
> Gianmarco
>
> On 13 May 2015 at 03:06, Jiangjie Qin <j...@linkedin.com.invalid> wrote:
>
> > Hi Harsha,
> >
> > If you open this link
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposa
> > ls
> >
> > All the KIPs are the child page of this page which you can see from the
> > left bar. Only KIP-22 is missing. It looks you created it as a child page
> > of
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Index
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
> > On 5/12/15, 3:12 PM, "Sriharsha Chintalapani" <ka...@harsha.io> wrote:
> >
> > >Hi Jiangjie,
> > > Its under
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22++Expose+a+Partit
> > >ioner+interface+in+the+new+producer
> > >I checked other KIPS they are under /KAFKA as well.
> > >
> > >Thanks,
> > >Harsha
> > >On May 12, 2015 at 2:12:30 PM, Jiangjie Qin (j...@linkedin.com.invalid)
> > >wrote:
> > >
> > >Hey Harsha,
> > >
> > >It looks you created the KIP page at wrong place. . . Can you move the
> > >page to a child page of
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Propos
> > >a
> > >ls
> > >
> > >Thanks.
> > >
> > >Jiangjie (Becket) Qin
> > >
> > >On 5/6/15, 6:12 PM, "Harsha" <ka...@harsha.io> wrote:
> > >
> > >>Thanks for the review Joel. I agree don't need a init method we can use
> > >>configure. I'll update the KIP.
> > >>-Harsha
> > >>
> > >>On Wed, May 6, 2015, at 04:45 PM, Joel Koshy wrote:
> > >>> +1 with a minor comment: do we need an init method given it extends
> > >>> Configurable?
> > >>>
> > >>> Also, can you move this wiki out of drafts and add it to the table in
> > >>>
> > >>>
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Prop
> > >>>o
> > >>>sals?
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Joel
> > >>>
> > >>> On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani
> > >>>wrote:
> > >>> > Thanks Jay. I removed partitioner.metadata from KIP. I¹ll send an
> > >>>updated patch.
> > >>> >
> > >>> > --
> > >>> > Harsha
> > >>> > Sent with Airmail
> > >>> >
> > >>> > On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani
> > >>>(harsh...@fastmail.fm) wrote:
> > >>> >
> > >>> > Thanks for the comments everyone.
> > >>> > Hi Jay,
> > >>> > I do have a question regarding configurable interface on how to
> > >>>pass a Map<String, ?> properties. I couldn¹t find any other classes
> > >>>using it. JMX reporter overrides it but doesn¹t implement it. So with
> > >>>configurable partitioner how can a user pass in partitioner
> > >>>configuration since its getting instantiated within the producer.
> > >>> >
> > >>> > Thanks,
> > >>> > Harsha
> > >>> >
> > >>> >
> > >>> > On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com)
> > >>>wrote:
> > >>> >
> > >>> > Hey Harsha,
> > >>> >
> > >>> > That proposal sounds good. One minor thing--I don't think we need
> to
> > >>>
> > >>>have
> > >>> > the partitioner.metadata property. Our reason for using string
> > >>>properties
> > >>> > is exactly to make config extensible at runtime. So a given
> > >>>partitioner can
> > >>> > add whatever properties make sense using the configure() api it
> > >>>defines.
> > >>> >
> > >>> > -Jay
> > >>> >
> > >>> > On Sun, May 3, 2015 at 5:57 PM, Harsha <ka...@harsha.io> wrote:
> > >>> >
> > >>> > > Thanks Jay & Gianmarco for the comments. I picked the option A,
> if
> > >>>
> > >>>user
> > >>> > > sends a partition id than it will applied and partitioner.class
> > >>>method
> > >>> > > will only called if partition id is null .
> > >>> > > Please take a look at the updated KIP here
> > >>> > >
> > >>> > >
> > >>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Par
> > >>>t
> > >>>itioner+interface+in+the+new+producer
> > >>> > > . Let me know if you see anything missing.
> > >>> > >
> > >>> > > Thanks,
> > >>> > > Harsha
> > >>> > >
> > >>> > > On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales
> > >>>wrote:
> > >>> > > > Hi,
> > >>> > > >
> > >>> > > >
> > >>> > > > Here are the questions I think we should consider:
> > >>> > > > > 1. Do we need this at all given that we have the partition
> > >>>argument in
> > >>> > > > > ProducerRecord which gives full control? I think we do need
> it
> > >>>
> > >>>because
> > >>> > > this
> > >>> > > > > is a way to plug in a different partitioning strategy at run
> > >>>time and
> > >>> > > do it
> > >>> > > > > in a fairly transparent way.
> > >>> > > > >
> > >>> > > >
> > >>> > > > Yes, we need it if we want to support different partitioning
> > >>>strategies
> > >>> > > > inside Kafka rather than requiring the user to code them
> > >>>externally.
> > >>> > > >
> > >>> > > >
> > >>> > > > > 3. Do we need to add the value? I suspect people will have
> > >>>uses
> > >>>for
> > >>> > > > > computing something off a few fields in the value to choose
> > >>>the
> > >>> > > partition.
> > >>> > > > > This would be useful in cases where the key was being used
> for
> > >>>
> > >>>log
> > >>> > > > > compaction purposes and did not contain the full information
> > >>>for
> > >>> > > computing
> > >>> > > > > the partition.
> > >>> > > > >
> > >>> > > >
> > >>> > > > I am not entirely sure about this. I guess that most
> > >>>partitioners
> > >>>should
> > >>> > > > not use it.
> > >>> > > > I think it makes it easier to reason about the system if the
> > >>>partitioner
> > >>> > > > only works on the key.
> > >>> > > > Hoever, if the value (and its serialization) are already
> > >>>available, there
> > >>> > > > is not much harm in passing them along.
> > >>> > > >
> > >>> > > >
> > >>> > > > > 4. This interface doesn't include either an init() or close()
> > >>>method.
> > >>> > > It
> > >>> > > > > should implement Closable and Configurable, right?
> > >>> > > > >
> > >>> > > >
> > >>> > > > Right now the only application I can think of to have an init()
> > >>>and
> > >>> > > > close()
> > >>> > > > is to read some state information (e.g., load information) that
> > >>>is
> > >>> > > > published on some external distributed storage (e.g.,
> zookeeper)
> > >>>
> > >>>by the
> > >>> > > > brokers.
> > >>> > > > It might be useful also for reconfiguration and state
> migration.
> > >>>
> > >>> > > >
> > >>> > > > I think it's not a very common use case right now, but if the
> > >>>added
> > >>> > > > complexity is not too much it might be worth to have support
> for
> > >>>
> > >>>these
> > >>> > > > methods.
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > > > > 5. What happens if the user both sets the partition id in the
> > >>> > > > > ProducerRecord and sets a partitioner? Does the partition id
> > >>>just get
> > >>> > > > > passed in to the partitioner (as sort of implied in this
> > >>>interface?).
> > >>> > > This
> > >>> > > > > is a bit weird since if you pass in the partition id you kind
> > >>>of
> > >>> > > expect it
> > >>> > > > > to get used, right? Or is it the case that if you specify a
> > >>>partition
> > >>> > > the
> > >>> > > > > partitioner isn't used at all (in which case no point in
> > >>>including
> > >>> > > > > partition in the Partitioner api).
> > >>> > > > >
> > >>> > > > >
> > >>> > > > The user should be able to override the partitioner on a
> > >>>per-record basis
> > >>> > > > by explicitly setting the partition id.
> > >>> > > > I don't think it makes sense for the partitioners to take
> > >>>"hints"
> > >>>on the
> > >>> > > > partition.
> > >>> > > >
> > >>> > > > I would even go the extra step, and have a default logic that
> > >>>accepts
> > >>> > > > both
> > >>> > > > key and partition id (current interface) and calls partition()
> > >>>only if
> > >>> > > > the
> > >>> > > > partition id is not set. The partition() method does *not* take
> > >>>the
> > >>> > > > partition ID as input (only key-value).
> > >>> > > >
> > >>> > > >
> > >>> > > > Cheers,
> > >>> > > > --
> > >>> > > > Gianmarco
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > > > > Cheers,
> > >>> > > > >
> > >>> > > > > -Jay
> > >>> > > > >
> > >>> > > > > On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani <
> > >>> > > ka...@harsha.io>
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Hi,
> > >>> > > > > > Here is the KIP for adding a partitioner interface for
> > >>> > > producer.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > >
> > >>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Par
> > >>>t
> > >>>itioner+interface+in+the+new+producer
> > >>> > > > > > There is one open question about how interface should look
> > >>>like.
> > >>> > > Please
> > >>> > > > > > take a look and let me know if you prefer one way or the
> > >>>other.
> > >>> > > > > >
> > >>> > > > > > Thanks,
> > >>> > > > > > Harsha
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > >
> > >>>
> > >
> >
> >
>

Reply via email to