Yes, let's describe that behavior in FAQ.

Thanks,

Jun


On Tue, Oct 1, 2013 at 8:35 AM, Joe Stein <crypt...@gmail.com> wrote:

> agreed, lets hold off until after 0.8
>
> I will update the JIRA ticket I created with your feedback and options we
> can discuss it there and then deal with changes in 0.8.1 or 0.9 or such.
>
> I will update the FAQ (should have time tomorrow unless someone else gets
> to it first) I think we should have it in there at the least, yes?
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
>
> On Tue, Oct 1, 2013 at 11:26 AM, Jun Rao <jun...@gmail.com> wrote:
>
> > This proposal still doesn't address the following fundamental issue: The
> > random partitioner cannot select a random and AVAILABLE partition.
> >
> > So, we have the following two choices.
> >
> > 1. Stick with the current partitioner api.
> > Then, we have to pick one way to do random partitioning (when key is
> null).
> > The current behavior may not be very intuitive, but is one of the
> possible
> > behaviors in 0.7.
> >
> > 2. Change the partitioner api so that we can (1) be aware of available
> > partitions and (2) have pluggable partitioners for doing random
> > distribution.
> >
> > Option (2) is probably the right approach. However, it's a non-trivial
> > change. So, I am not sure if it should be done in 0.8 or not.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Sep 30, 2013 at 10:21 PM, Joe Stein <crypt...@gmail.com> wrote:
> >
> > > How about making UUID.randomUUID.toString() the default in KeyedMessage
> > > instead of null if not supplied
> > >
> > > def this(topic: String, message: V) = this(topic,
> > > UUID.randomUUID.toString(),
> > > message)
> > >
> > > and if you want the random refresh behavior then pass in "*" on the
> > > KeyedMessage construction which we can then later check for in
> > > defaulteventhandler
> > >
> > >  val partition =
> > >       if(key =="*") {
> > >
> > > we then throw NPE if key == null in KeyedMessage like we do topic
> > >
> > > I believe any null flow control logic is something to shy away from
> > >
> > > if this is wrong or too much or still not the best solution we could
> also
> > > hold over and just put this in the FAQ with the JIRA and let people
> know
> > > when they run into this and want to randomize in development / testing
> > and
> > > in many production situations where the producer count is not large
> > enough
> > > then they have to pass in their own continuous random key... if we can
> > get
> > > a consensus for what we want to-do with minimal changes then I think it
> > is
> > > important for 0.8 otherwise wait.
> > >
> > > On Sun, Sep 29, 2013 at 12:14 PM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > The main issue is that if we do that, when key is null, we can only
> > > select
> > > > a random partition, but not a random and available partition, without
> > > > changing the partitioner api. Being able to do the latter is
> important
> > in
> > > > my opinion. For example, a user may choose the replication factor of
> a
> > > > topic to be 1. If a broker is down, it's much better to select
> > partitions
> > > > on other brokers for producing than losing messages.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Sat, Sep 28, 2013 at 9:51 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > I think Joe's suggesting that we can remove the checking logic for
> > > > > key==null in DefaultEventHandler, and do that in partitioner.
> > > > >
> > > > > One thing about this idea is any customized partitioner also has to
> > > > > consider key == null case then.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Sep 27, 2013 at 9:12 PM, Jun Rao <jun...@gmail.com> wrote:
> > > > >
> > > > > > We have the following code in DefaultEventHandler:
> > > > > >
> > > > > >     val partition =
> > > > > >       if(key == null) {
> > > > > >         // If the key is null, we don't really need a partitioner
> > > > > >         // So we look up in the send partition cache for the
> topic
> > to
> > > > > > decide the target partition
> > > > > >         val id = sendPartitionPerTopicCache.get(topic)
> > > > > >         id match {
> > > > > >           case Some(partitionId) =>
> > > > > >             // directly return the partitionId without checking
> > > > > > availability of the leader,
> > > > > >             // since we want to postpone the failure until the
> send
> > > > > > operation anyways
> > > > > >             partitionId
> > > > > >           case None =>
> > > > > >             val availablePartitions =
> > > > > > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> > > > > >             if (availablePartitions.isEmpty)
> > > > > >               throw new LeaderNotAvailableException("No leader
> for
> > > any
> > > > > > partition in topic " + topic)
> > > > > >             val index =
> > > Utils.abs(partitionCounter.getAndIncrement()) %
> > > > > > availablePartitions.size
> > > > > >             val partitionId =
> > availablePartitions(index).partitionId
> > > > > >             sendPartitionPerTopicCache.put(topic, partitionId)
> > > > > >             partitionId
> > > > > >         }
> > > > > >       } else
> > > > > >         partitioner.partition(key, numPartitions)
> > > > > >
> > > > > > So, if key is null, the partitioner is ignored.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Fri, Sep 27, 2013 at 10:30 AM, Joe Stein <crypt...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > hmmm, yeah, on I don't want todo that ... if we don't have to.
> > > > > > >
> > > > > > > What if the DefaultPartitioner code looked like this instead
> =8^)
> > > > > > >
> > > > > > > private class DefaultPartitioner[T](props:
> VerifiableProperties =
> > > > null)
> > > > > > > extends Partitioner[T] {
> > > > > > >
> > > > > > >   def partition(key: T, numPartitions: Int): Int = {
> > > > > > >     if (key == null) {
> > > > > > >         import java.util.UUID
> > > > > > >         Utils.abs(UUID.randomUUID.toString()) % numPartitions
> > > > > > >     }
> > > > > > >     else {
> > > > > > >        Utils.abs(key.hashCode) % numPartitions
> > > > > > >     }
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > Again the goal here is the simple (often initial and dev side
> up
> > > and
> > > > > > > running out of the box) so folks don't have to randomize the
> keys
> > > > > > > themselves to get this effect
> > > > > > >
> > > > > > > We would still have to also have RandomMetaRefreshPartitioner
> > class
> > > > > > right?
> > > > > > > so null keys there would wait for the time refresh for that use
> > > case,
> > > > > > > right?
> > > > > > >
> > > > > > > private class RandomMetaRefreshPartitioner[T](props:
> > > > > > VerifiableProperties =
> > > > > > > null) extends Partitioner[T] {
> > > > > > >
> > > > > > >   def partition(key: T, numPartitions: Int): Int = {
> > > > > > >     Utils.abs(key.hashCode) % numPartitions
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <jun...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > However, currently, if key is null, the partitioner is not
> even
> > > > > called.
> > > > > > > Do
> > > > > > > > you want to change DefaultEventHandler too?
> > > > > > > >
> > > > > > > > This also doesn't allow the partitioner to select a random
> and
> > > > > > available
> > > > > > > > partition, which in my opinion is more important than making
> > > > > partitions
> > > > > > > > perfectly evenly balanced.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <
> crypt...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > What I was proposing was two fold
> > > > > > > > >
> > > > > > > > > 1) revert the DefaultPartitioner class
> > > > > > > > >
> > > > > > > > > then
> > > > > > > > >
> > > > > > > > > 2) create a new partitioner that folks could use (like at
> > > > LinkedIn
> > > > > > you
> > > > > > > > > would use this partitioner instead) in ProducerConfig
> > > > > > > > >
> > > > > > > > > private class RandomRefreshTimPartitioner[T](props:
> > > > > > > VerifiableProperties
> > > > > > > > =
> > > > > > > > > null) extends Partitioner[T] {
> > > > > > > > >   private val random = new java.util.Random
> > > > > > > > >
> > > > > > > > >   def partition(key: T, numPartitions: Int): Int = {
> > > > > > > > >     Utils.abs(key.hashCode) % numPartitions
> > > > > > > > >   }
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > /*******************************************
> > > > > > > > >  Joe Stein
> > > > > > > > >  Founder, Principal Consultant
> > > > > > > > >  Big Data Open Source Security LLC
> > > > > > > > >  http://www.stealth.ly
> > > > > > > > >  Twitter: @allthingshadoop <
> > > > http://www.twitter.com/allthingshadoop
> > > > > >
> > > > > > > > > ********************************************/
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <
> jun...@gmail.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Joe,
> > > > > > > > > >
> > > > > > > > > > Not sure I fully understand your propose. Do you want to
> > put
> > > > the
> > > > > > > random
> > > > > > > > > > partitioning selection logic (for messages without a key)
> > in
> > > > the
> > > > > > > > > > partitioner without changing the partitioner api? That's
> > > > > difficult.
> > > > > > > The
> > > > > > > > > > issue is that in the current partitioner api, we don't
> know
> > > > which
> > > > > > > > > > partitions are available. For example, if we have
> > replication
> > > > > > factor
> > > > > > > 1
> > > > > > > > > on a
> > > > > > > > > > topic and a broker is down, the best thing to do for the
> > > random
> > > > > > > > > partitioner
> > > > > > > > > > is to select an available partition at random (assuming
> > more
> > > > > than 1
> > > > > > > > > > partition is created for the topic).
> > > > > > > > > >
> > > > > > > > > > Another option is to revert the logic in the random
> > > > partitioning
> > > > > > > > > selection
> > > > > > > > > > logic in DefaultEventHandler to select a random partition
> > per
> > > > > batch
> > > > > > > of
> > > > > > > > > > events (instead of sticking with a random partition for
> > some
> > > > > > > configured
> > > > > > > > > > amount of time). This is doable, but I am not sure if
> it's
> > > that
> > > > > > > > critical.
> > > > > > > > > > Since this is one of the two possible behaviors in 0.7,
> > it's
> > > > hard
> > > > > > to
> > > > > > > > say
> > > > > > > > > > whether people will be surprised by that. Preserving both
> > > > > behaviors
> > > > > > > in
> > > > > > > > > 0.7
> > > > > > > > > > will require changing the partitioner api. This is more
> > work
> > > > and
> > > > > I
> > > > > > > > agree
> > > > > > > > > > it's better to do this post 0.8.0 final.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein <
> > > crypt...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Jun, can we hold this extra change over for 0.8.1 and
> > just
> > > go
> > > > > > with
> > > > > > > > > > > reverting where we were before for the default with a
> new
> > > > > > partition
> > > > > > > > for
> > > > > > > > > > > meta refresh and support both?
> > > > > > > > > > >
> > > > > > > > > > > I am not sure I entirely understand why someone would
> > need
> > > > the
> > > > > > > extra
> > > > > > > > > > > functionality you are talking about which sounds cool
> > > > though...
> > > > > > > > adding
> > > > > > > > > it
> > > > > > > > > > > to the API (especially now) without people using it may
> > > just
> > > > > make
> > > > > > > > folks
> > > > > > > > > > ask
> > > > > > > > > > > more questions and maybe not use it ... IDK ... but in
> > any
> > > > case
> > > > > > we
> > > > > > > > can
> > > > > > > > > > work
> > > > > > > > > > > on buttoning up 0.8 and shipping just the change for
> two
> > > > > > > partitioners
> > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1067 and
> > > > circling
> > > > > > back
> > > > > > > > if
> > > > > > > > > we
> > > > > > > > > > > wanted on this extra item (including the discussion) to
> > > 0.8.1
> > > > > or
> > > > > > > > > greater?
> > > > > > > > > > >  I am always of the mind of reduce complexity unless
> that
> > > > > > > complexity
> > > > > > > > is
> > > > > > > > > > in
> > > > > > > > > > > fact better than not having it.
> > > > > > > > > > >
> > > > > > > > > > > On Sun, Sep 22, 2013 at 8:56 PM, Jun Rao <
> > jun...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > It's reasonable to make the behavior of random
> > producers
> > > > > > > > customizable
> > > > > > > > > > > > through a pluggable partitioner. So, if one doesn't
> > care
> > > > > about
> > > > > > #
> > > > > > > of
> > > > > > > > > > > socket
> > > > > > > > > > > > connections, one can choose to select a random
> > partition
> > > on
> > > > > > every
> > > > > > > > > send.
> > > > > > > > > > > If
> > > > > > > > > > > > one does have many producers, one can choose to
> > > > periodically
> > > > > > > > select a
> > > > > > > > > > > > random partition. To support this, the partitioner
> api
> > > > needs
> > > > > to
> > > > > > > be
> > > > > > > > > > > changed
> > > > > > > > > > > > though.
> > > > > > > > > > > >
> > > > > > > > > > > > Instead of
> > > > > > > > > > > >   def partition(key: T, numPartitions: Int): Int
> > > > > > > > > > > >
> > > > > > > > > > > > we probably need something like the following:
> > > > > > > > > > > >   def partition(key: T, numPartitions: Int,
> > > > > > > availablePartitionList:
> > > > > > > > > > > > List[Int], isNewBatch: boolean, isRefreshMetadata:
> > > > boolean):
> > > > > > Int
> > > > > > > > > > > >
> > > > > > > > > > > > availablePartitionList: allows us to select only
> > > partitions
> > > > > > that
> > > > > > > > are
> > > > > > > > > > > > available.
> > > > > > > > > > > > isNewBatch: allows us to select the same partition
> for
> > > all
> > > > > > > messages
> > > > > > > > > in
> > > > > > > > > > a
> > > > > > > > > > > > given batch in the async mode.
> > > > > > > > > > > > isRefreshMedatadata: allows us to implement the
> policy
> > of
> > > > > > > switching
> > > > > > > > > to
> > > > > > > > > > a
> > > > > > > > > > > > random partition periodically.
> > > > > > > > > > > >
> > > > > > > > > > > > This will make the partitioner api a bit more
> > > complicated.
> > > > > > > However,
> > > > > > > > > it
> > > > > > > > > > > does
> > > > > > > > > > > > provide enough information for customization.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein <
> > > > > crypt...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Sounds good, I will create a JIRA and upload a
> patch.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > /*******************************************
> > > > > > > > > > > > >  Joe Stein
> > > > > > > > > > > > >  Founder, Principal Consultant
> > > > > > > > > > > > >  Big Data Open Source Security LLC
> > > > > > > > > > > > >  http://www.stealth.ly
> > > > > > > > > > > > >  Twitter: @allthingshadoop
> > > > > > > > > > > > > ********************************************/
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Sep 17, 2013, at 1:19 PM, Joel Koshy <
> > > > > jjkosh...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > I agree that minimizing the number of producer
> > > > > connections
> > > > > > > > (while
> > > > > > > > > > > > > > being a good thing) is really required in very
> > large
> > > > > > > production
> > > > > > > > > > > > > > deployments, and the net-effect of the existing
> > > change
> > > > is
> > > > > > > > > > > > > > counter-intuitive to users who expect an
> immediate
> > > even
> > > > > > > > > > distribution
> > > > > > > > > > > > > > across _all_ partitions of the topic.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > However, I don't think it is a hack because it is
> > > > almost
> > > > > > > > exactly
> > > > > > > > > > the
> > > > > > > > > > > > > > same behavior as 0.7 in one of its modes. The 0.7
> > > > > producer
> > > > > > > > > (which I
> > > > > > > > > > > > > > think was even more confusing) had three modes:
> > > > > > > > > > > > > > i) ZK send
> > > > > > > > > > > > > > ii) Config send(a): static list of
> > > > > > > > > broker1:port1,broker2:port2,etc.
> > > > > > > > > > > > > > iii) Config send(b): static list of a
> > > > hardwareVIP:VIPport
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > (i) and (ii) would achieve even distribution.
> (iii)
> > > > would
> > > > > > > > > > effectively
> > > > > > > > > > > > > > select one broker and distribute to partitions on
> > > that
> > > > > > broker
> > > > > > > > > > within
> > > > > > > > > > > > > > each reconnect interval. (iii) is very similar to
> > > what
> > > > we
> > > > > > now
> > > > > > > > do
> > > > > > > > > in
> > > > > > > > > > > > > > 0.8. (Although we stick to one partition during
> > each
> > > > > > metadata
> > > > > > > > > > refresh
> > > > > > > > > > > > > > interval that can be changed to stick to one
> broker
> > > and
> > > > > > > > > distribute
> > > > > > > > > > > > > > across partitions on that broker).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > At the same time, I agree with Joe's suggestion
> > that
> > > we
> > > > > > > should
> > > > > > > > > keep
> > > > > > > > > > > > > > the more intuitive pre-KAFKA-1017 behavior as the
> > > > default
> > > > > > and
> > > > > > > > > move
> > > > > > > > > > > the
> > > > > > > > > > > > > > change in KAFKA-1017 to a more specific
> partitioner
> > > > > > > > > implementation.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Joel
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps <
> > > > > > > > jay.kr...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >> Let me ask another question which I think is
> more
> > > > > > objective.
> > > > > > > > > Let's
> > > > > > > > > > > say
> > > > > > > > > > > > > 100
> > > > > > > > > > > > > >> random, smart infrastructure specialists try
> > Kafka,
> > > of
> > > > > > these
> > > > > > > > 100
> > > > > > > > > > how
> > > > > > > > > > > > > many
> > > > > > > > > > > > > >> do you believe will
> > > > > > > > > > > > > >> 1. Say that this behavior is what they expected
> to
> > > > > happen?
> > > > > > > > > > > > > >> 2. Be happy with this behavior?
> > > > > > > > > > > > > >> I am not being facetious I am genuinely looking
> > for
> > > a
> > > > > > > > numerical
> > > > > > > > > > > > > estimate. I
> > > > > > > > > > > > > >> am trying to figure out if nobody thought about
> > this
> > > > or
> > > > > if
> > > > > > > my
> > > > > > > > > > > estimate
> > > > > > > > > > > > > is
> > > > > > > > > > > > > >> just really different. For what it is worth my
> > > > estimate
> > > > > > is 0
> > > > > > > > > and 5
> > > > > > > > > > > > > >> respectively.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> This would be fine expect that we changed it
> from
> > > the
> > > > > good
> > > > > > > > > > behavior
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> bad behavior to fix an issue that probably only
> we
> > > > have.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> -Jay
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps <
> > > > > > > > jay.kr...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>> I just took a look at this change. I agree with
> > > Joe,
> > > > > not
> > > > > > to
> > > > > > > > put
> > > > > > > > > > to
> > > > > > > > > > > > > fine a
> > > > > > > > > > > > > >>> point on it, but this is a confusing hack.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Jun, I don't think wanting to minimizing the
> > number
> > > > of
> > > > > > TCP
> > > > > > > > > > > > connections
> > > > > > > > > > > > > is
> > > > > > > > > > > > > >>> going to be a very common need for people with
> > less
> > > > > than
> > > > > > > 10k
> > > > > > > > > > > > > producers. I
> > > > > > > > > > > > > >>> also don't think people are going to get very
> > good
> > > > load
> > > > > > > > > balancing
> > > > > > > > > > > out
> > > > > > > > > > > > > of
> > > > > > > > > > > > > >>> this because most people don't have a ton of
> > > > > producers. I
> > > > > > > > think
> > > > > > > > > > > > > instead we
> > > > > > > > > > > > > >>> will spend the next year explaining this
> behavior
> > > > which
> > > > > > 99%
> > > > > > > > of
> > > > > > > > > > > people
> > > > > > > > > > > > > will
> > > > > > > > > > > > > >>> think is a bug (because it is crazy,
> > non-intuitive,
> > > > and
> > > > > > > > breaks
> > > > > > > > > > > their
> > > > > > > > > > > > > usage).
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Why was this done by adding special default
> > > behavior
> > > > in
> > > > > > the
> > > > > > > > > null
> > > > > > > > > > > key
> > > > > > > > > > > > > case
> > > > > > > > > > > > > >>> instead of as a partitioner? The argument that
> > the
> > > > > > > > partitioner
> > > > > > > > > > > > > interface
> > > > > > > > > > > > > >>> doesn't have sufficient information to choose a
> > > > > partition
> > > > > > > is
> > > > > > > > > not
> > > > > > > > > > a
> > > > > > > > > > > > good
> > > > > > > > > > > > > >>> argument for hacking in changes to the default,
> > it
> > > is
> > > > > an
> > > > > > > > > argument
> > > > > > > > > > > > for *
> > > > > > > > > > > > > >>> improving* the partitioner interface.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> The whole point of a partitioner interface is
> to
> > > make
> > > > > it
> > > > > > > > > possible
> > > > > > > > > > > to
> > > > > > > > > > > > > plug
> > > > > > > > > > > > > >>> in non-standard behavior like this, right?
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> -Jay
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <
> > > > > > jun...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>> Joe,
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> Thanks for bringing this up. I want to clarify
> > > this
> > > > a
> > > > > > bit.
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> 1. Currently, the producer side logic is that
> if
> > > the
> > > > > > > > > > partitioning
> > > > > > > > > > > > key
> > > > > > > > > > > > > is
> > > > > > > > > > > > > >>>> not provided (i.e., it is null), the
> partitioner
> > > > won't
> > > > > > be
> > > > > > > > > > called.
> > > > > > > > > > > We
> > > > > > > > > > > > > did
> > > > > > > > > > > > > >>>> that because we want to select a random and
> > > > > "available"
> > > > > > > > > > partition
> > > > > > > > > > > to
> > > > > > > > > > > > > send
> > > > > > > > > > > > > >>>> messages so that if some partitions are
> > > temporarily
> > > > > > > > > unavailable
> > > > > > > > > > > > > (because
> > > > > > > > > > > > > >>>> of
> > > > > > > > > > > > > >>>> broker failures), messages can still be sent
> to
> > > > other
> > > > > > > > > > partitions.
> > > > > > > > > > > > > Doing
> > > > > > > > > > > > > >>>> this in the partitioner is difficult since the
> > > > > > partitioner
> > > > > > > > > > doesn't
> > > > > > > > > > > > > know
> > > > > > > > > > > > > >>>> which partitions are currently available (the
> > > > > > > > > > DefaultEventHandler
> > > > > > > > > > > > > does).
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> 2. As Joel said, the common use case in
> > production
> > > > is
> > > > > > that
> > > > > > > > > there
> > > > > > > > > > > are
> > > > > > > > > > > > > many
> > > > > > > > > > > > > >>>> more producers than #partitions in a topic. In
> > > this
> > > > > > case,
> > > > > > > > > > sticking
> > > > > > > > > > > > to
> > > > > > > > > > > > > a
> > > > > > > > > > > > > >>>> partition for a few minutes is not going to
> > cause
> > > > too
> > > > > > much
> > > > > > > > > > > imbalance
> > > > > > > > > > > > > in
> > > > > > > > > > > > > >>>> the
> > > > > > > > > > > > > >>>> partitions and has the benefit of reducing
> the #
> > > of
> > > > > > socket
> > > > > > > > > > > > > connections. My
> > > > > > > > > > > > > >>>> feeling is that this will benefit most
> > production
> > > > > users.
> > > > > > > In
> > > > > > > > > > fact,
> > > > > > > > > > > if
> > > > > > > > > > > > > one
> > > > > > > > > > > > > >>>> uses a hardware load balancer for producing
> data
> > > in
> > > > > 0.7,
> > > > > > > it
> > > > > > > > > > > behaves
> > > > > > > > > > > > in
> > > > > > > > > > > > > >>>> exactly the same way (a producer will stick
> to a
> > > > > broker
> > > > > > > > until
> > > > > > > > > > the
> > > > > > > > > > > > > >>>> reconnect
> > > > > > > > > > > > > >>>> interval is reached).
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> 3. It is true that If one is testing a topic
> > with
> > > > more
> > > > > > > than
> > > > > > > > > one
> > > > > > > > > > > > > partition
> > > > > > > > > > > > > >>>> (which is not the default value), this
> behavior
> > > can
> > > > > be a
> > > > > > > bit
> > > > > > > > > > > weird.
> > > > > > > > > > > > > >>>> However, I think it can be mitigated by
> running
> > > > > multiple
> > > > > > > > test
> > > > > > > > > > > > producer
> > > > > > > > > > > > > >>>> instances.
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> 4. Someone reported in the mailing list that
> all
> > > > data
> > > > > > > shows
> > > > > > > > in
> > > > > > > > > > > only
> > > > > > > > > > > > > one
> > > > > > > > > > > > > >>>> partition after a few weeks. This is clearly
> not
> > > the
> > > > > > > > expected
> > > > > > > > > > > > > behavior. We
> > > > > > > > > > > > > >>>> can take a closer look to see if this is real
> > > issue.
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> Do you think these address your concerns?
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> Jun
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein <
> > > > > > > > > crypt...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>>> How about creating a new class called
> > > > > > > > RandomRefreshPartioner
> > > > > > > > > > and
> > > > > > > > > > > > copy
> > > > > > > > > > > > > >>>> the
> > > > > > > > > > > > > >>>>> DefaultPartitioner code to it and then revert
> > the
> > > > > > > > > > > > DefaultPartitioner
> > > > > > > > > > > > > >>>> code.
> > > > > > > > > > > > > >>>>> I appreciate this is a one time burden for
> > folks
> > > > > using
> > > > > > > the
> > > > > > > > > > > existing
> > > > > > > > > > > > > >>>>> 0.8-beta1 bumping into KAFKA-1017 in
> production
> > > > > having
> > > > > > to
> > > > > > > > > > switch
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>> RandomRefreshPartioner and when folks deploy
> to
> > > > > > > production
> > > > > > > > > will
> > > > > > > > > > > > have
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >>>>> consider this property change.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> I make this suggestion keeping in mind the
> new
> > > > folks
> > > > > > that
> > > > > > > > on
> > > > > > > > > > > board
> > > > > > > > > > > > > with
> > > > > > > > > > > > > >>>>> Kafka and when everyone is in development and
> > > > testing
> > > > > > > mode
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > > first
> > > > > > > > > > > > > >>>>> time their experience would be as expected
> from
> > > how
> > > > > it
> > > > > > > > would
> > > > > > > > > > work
> > > > > > > > > > > > in
> > > > > > > > > > > > > >>>>> production this way.  In dev/test when first
> > > using
> > > > > > Kafka
> > > > > > > > they
> > > > > > > > > > > won't
> > > > > > > > > > > > > >>>> have so
> > > > > > > > > > > > > >>>>> many producers for partitions but would look
> to
> > > > > > > parallelize
> > > > > > > > > > their
> > > > > > > > > > > > > >>>> consumers
> > > > > > > > > > > > > >>>>> IMHO.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> The random broker change sounds like maybe a
> > > bigger
> > > > > > > change
> > > > > > > > > now
> > > > > > > > > > > this
> > > > > > > > > > > > > late
> > > > > > > > > > > > > >>>>> in the release cycle if we can accommodate
> > folks
> > > > > trying
> > > > > > > > Kafka
> > > > > > > > > > for
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>> first
> > > > > > > > > > > > > >>>>> time and through their development and
> testing
> > > > along
> > > > > > with
> > > > > > > > > full
> > > > > > > > > > > > blown
> > > > > > > > > > > > > >>>>> production deploys.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> /*******************************************
> > > > > > > > > > > > > >>>>> Joe Stein
> > > > > > > > > > > > > >>>>> Founder, Principal Consultant
> > > > > > > > > > > > > >>>>> Big Data Open Source Security LLC
> > > > > > > > > > > > > >>>>> http://www.stealth.ly
> > > > > > > > > > > > > >>>>> Twitter: @allthingshadoop
> > > > > > > > > > > > > >>>>> ********************************************/
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy <
> > > > > > > > jjkosh...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> Thanks for bringing this up - it is
> > definitely
> > > an
> > > > > > > > important
> > > > > > > > > > > point
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >>>>>>> discuss. The underlying issue of KAFKA-1017
> > was
> > > > > > > uncovered
> > > > > > > > > to
> > > > > > > > > > > some
> > > > > > > > > > > > > >>>>> degree by
> > > > > > > > > > > > > >>>>>>> the fact that in our deployment we did not
> > > > > > > significantly
> > > > > > > > > > > increase
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>> total
> > > > > > > > > > > > > >>>>>>> number of partitions over 0.7 - i.e., in
> 0.7
> > we
> > > > had
> > > > > > say
> > > > > > > > > four
> > > > > > > > > > > > > >>>> partitions
> > > > > > > > > > > > > >>>>> per
> > > > > > > > > > > > > >>>>>>> broker, now we are using (say) eight
> > partitions
> > > > > > across
> > > > > > > > the
> > > > > > > > > > > > cluster.
> > > > > > > > > > > > > >>>> So
> > > > > > > > > > > > > >>>>> with
> > > > > > > > > > > > > >>>>>>> random partitioning every producer would
> end
> > up
> > > > > > > > connecting
> > > > > > > > > to
> > > > > > > > > > > > > nearly
> > > > > > > > > > > > > >>>>> every
> > > > > > > > > > > > > >>>>>>> broker (unlike 0.7 in which we would
> connect
> > to
> > > > > only
> > > > > > > one
> > > > > > > > > > broker
> > > > > > > > > > > > > >>>> within
> > > > > > > > > > > > > >>>>> each
> > > > > > > > > > > > > >>>>>>> reconnect interval). In a production-scale
> > > > > deployment
> > > > > > > > that
> > > > > > > > > > > causes
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>> high
> > > > > > > > > > > > > >>>>>>> number of connections that KAFKA-1017
> > > addresses.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> You are right that the fix of sticking to
> one
> > > > > > partition
> > > > > > > > > over
> > > > > > > > > > > the
> > > > > > > > > > > > > >>>>> metadata
> > > > > > > > > > > > > >>>>>>> refresh interval goes against true consumer
> > > > > > > parallelism,
> > > > > > > > > but
> > > > > > > > > > > this
> > > > > > > > > > > > > >>>> would
> > > > > > > > > > > > > >>>>> be
> > > > > > > > > > > > > >>>>>>> the case only if there are few producers.
> If
> > > you
> > > > > > have a
> > > > > > > > > > sizable
> > > > > > > > > > > > > >>>> number
> > > > > > > > > > > > > >>>>> of
> > > > > > > > > > > > > >>>>>>> producers on average all partitions would
> get
> > > > > uniform
> > > > > > > > > volumes
> > > > > > > > > > > of
> > > > > > > > > > > > > >>>> data.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> One tweak to KAFKA-1017 that I think is
> > > > reasonable
> > > > > > > would
> > > > > > > > be
> > > > > > > > > > > > instead
> > > > > > > > > > > > > >>>> of
> > > > > > > > > > > > > >>>>>>> sticking to a random partition, stick to a
> > > random
> > > > > > > broker
> > > > > > > > > and
> > > > > > > > > > > send
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >>>>> random
> > > > > > > > > > > > > >>>>>>> partitions within that broker. This would
> > make
> > > > the
> > > > > > > > behavior
> > > > > > > > > > > > closer
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >>>>> 0.7
> > > > > > > > > > > > > >>>>>>> wrt number of connections and random
> > > partitioning
> > > > > > > > provided
> > > > > > > > > > the
> > > > > > > > > > > > > >>>> number of
> > > > > > > > > > > > > >>>>>>> partitions per broker is high enough, which
> > is
> > > > why
> > > > > I
> > > > > > > > > > mentioned
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> partition count (in our usage) in 0.7 vs
> 0.8
> > > > above.
> > > > > > > > > Thoughts?
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> Joel
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> On Friday, September 13, 2013, Joe Stein
> > wrote:
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>> First, let me apologize for not
> > > > realizing/noticing
> > > > > > > this
> > > > > > > > > > until
> > > > > > > > > > > > > today.
> > > > > > > > > > > > > >>>>> One
> > > > > > > > > > > > > >>>>>>>> reason I left my last company was not
> being
> > > paid
> > > > > to
> > > > > > > work
> > > > > > > > > on
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > >>>> nor
> > > > > > > > > > > > > >>>>>>> being
> > > > > > > > > > > > > >>>>>>> able to afford any time for a while to work
> > on
> > > > it.
> > > > > > Now
> > > > > > > in
> > > > > > > > > my
> > > > > > > > > > > new
> > > > > > > > > > > > > gig
> > > > > > > > > > > > > >>>>> (just
> > > > > > > > > > > > > >>>>>>> wrapped up my first week, woo hoo) while I
> am
> > > > still
> > > > > > not
> > > > > > > > > "paid
> > > > > > > > > > > to
> > > > > > > > > > > > > >>>> work on
> > > > > > > > > > > > > >>>>>>> Kafka" I can afford some more time for it
> now
> > > and
> > > > > > maybe
> > > > > > > > in
> > > > > > > > > 6
> > > > > > > > > > > > > months I
> > > > > > > > > > > > > >>>>> will
> > > > > > > > > > > > > >>>>>>> be able to hire folks to work on Kafka
> (with
> > > more
> > > > > and
> > > > > > > > more
> > > > > > > > > > time
> > > > > > > > > > > > for
> > > > > > > > > > > > > >>>>> myself
> > > > > > > > > > > > > >>>>>>> to work on it too) while we also work on
> > client
> > > > > > > projects
> > > > > > > > > > > > > (especially
> > > > > > > > > > > > > >>>>> Kafka
> > > > > > > > > > > > > >>>>>>> based ones).
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> So, I understand about the changes that
> were
> > > made
> > > > > to
> > > > > > > fix
> > > > > > > > > open
> > > > > > > > > > > > file
> > > > > > > > > > > > > >>>>> handles
> > > > > > > > > > > > > >>>>>>> and make the random pinning be timed based
> > > (with
> > > > a
> > > > > > very
> > > > > > > > > large
> > > > > > > > > > > > > default
> > > > > > > > > > > > > >>>>>>> time).  Got all that.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> But, doesn't this completely negate what
> has
> > > been
> > > > > > > > > > communicated
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> community for a very long time and the
> > > > expectation
> > > > > > they
> > > > > > > > > > have? I
> > > > > > > > > > > > > >>>> think it
> > > > > > > > > > > > > >>>>>>> does.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> The expected functionality for random
> > > > partitioning
> > > > > is
> > > > > > > > that
> > > > > > > > > > > "This
> > > > > > > > > > > > > can
> > > > > > > > > > > > > >>>> be
> > > > > > > > > > > > > >>>>>>> done in a round-robin fashion simply to
> > balance
> > > > > load"
> > > > > > > and
> > > > > > > > > > that
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> "producer" does it for you.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> Isn't a primary use case for partitions to
> > > > paralyze
> > > > > > > > > > consumers?
> > > > > > > > > > > If
> > > > > > > > > > > > > so
> > > > > > > > > > > > > >>>>> then
> > > > > > > > > > > > > >>>>>>> the expectation would be that all consumers
> > > would
> > > > > be
> > > > > > > > > getting
> > > > > > > > > > in
> > > > > > > > > > > > > >>>> parallel
> > > > > > > > > > > > > >>>>>>> equally in a "round robin fashion" the data
> > > that
> > > > > was
> > > > > > > > > produced
> > > > > > > > > > > for
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> topic... simply to balance load...with the
> > > > producer
> > > > > > > > > handling
> > > > > > > > > > it
> > > > > > > > > > > > and
> > > > > > > > > > > > > >>>> with
> > > > > > > > > > > > > >>>>>>> the client application not having to-do
> > > anything.
> > > > > > This
> > > > > > > > > > > randomness
> > > > > > > > > > > > > >>>>> occurring
> > > > > > > > > > > > > >>>>>>> every 10 minutes can't balance load.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> If users are going to work around this
> > anyways
> > > > (as
> > > > > I
> > > > > > > > would
> > > > > > > > > > > > honestly
> > > > > > > > > > > > > >>>> do
> > > > > > > > > > > > > >>>>> too)
> > > > > > > > > > > > > >>>>>>> doing a pseudo semantic random key and
> > > > essentially
> > > > > > > > forcing
> > > > > > > > > > real
> > > > > > > > > > > > > >>>>> randomness
> > > > > > > > > > > > > >>>>>>> to simply balance load to my consumers
> > running
> > > in
> > > > > > > > parallel
> > > > > > > > > > > would
> > > > > > > > > > > > we
> > > > > > > > > > > > > >>>>> still
> > > > > > > > > > > > > >>>>>>> end up hitting the KAFKA-1017 problem
> > anyways?
> > > If
> > > > > not
> > > > > > > > then
> > > > > > > > > > why
> > > > > > > > > > > > > can't
> > > > > > > > > > > > > >>>> we
> > > > > > > > > > > > > >>>>>>> just give users the functionality and put
> > back
> > > > the
> > > > > 3
> > > > > > > > lines
> > > > > > > > > of
> > > > > > > > > > > > code
> > > > > > > > > > > > > 1)
> > > > > > > > > > > > > >>>>>>> if(key == null) 2)
> > >  random.nextInt(numPartitions)
> > > > > 3)
> > > > > > > else
> > > > > > > > > ...
> > > > > > > > > > > If
> > > > > > > > > > > > we
> > > > > > > > > > > > > >>>>> would
> > > > > > > > > > > > > >>>>>>> bump into KAFKA-1017 by working around it
> > then
> > > we
> > > > > > have
> > > > > > > > not
> > > > > > > > > > > really
> > > > > > > > > > > > > >>>> solved
> > > > > > > > > > > > > >>>>>>> the root cause problem and removing
> expected
> > > > > > > > functionality
> > > > > > > > > > for
> > > > > > > > > > > a
> > > > > > > > > > > > > >>>> corner
> > > > > > > > > > > > > >>>>>>> case that might have other work arounds
> > and/or
> > > > code
> > > > > > > > changes
> > > > > > > > > > to
> > > > > > > > > > > > > solve
> > > > > > > > > > > > > >>>> it
> > > > > > > > > > > > > >>>>>>> another way or am I still not getting
> > > something?
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> Also, I was looking at
> testRandomPartitioner
> > in
> > > > > > > > > > > AsyncProducerTest
> > > > > > > > > > > > > >>>> and I
> > > > > > > > > > > > > >>>>>>> don't see how this would ever fail, the
> > > assertion
> > > > > is
> > > > > > > > always
> > > > > > > > > > for
> > > > > > > > > > > > > >>>>> partitionId
> > > > > > > > > > > > > >>>>>>> == 0 and it should be checking that data is
> > > going
> > > > > to
> > > > > > > > > > different
> > > > > > > > > > > > > >>>>> partitions
> > > > > > > > > > > > > >>>>>>> for a topic, right?
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> Let me know, I think this is an important
> > > > > discussion
> > > > > > > and
> > > > > > > > > even
> > > > > > > > > > > if
> > > > > > > > > > > > it
> > > > > > > > > > > > > >>>>> ends up
> > > > > > > > > > > > > >>>>>>> as telling the community to only use one
> > > > partition
> > > > > > that
> > > > > > > > is
> > > > > > > > > > all
> > > > > > > > > > > > you
> > > > > > > > > > > > > >>>> need
> > > > > > > > > > > > > >>>>> and
> > > > > > > > > > > > > >>>>>>> partitions become our super columns (Apache
> > > > > Cassandra
> > > > > > > > joke,
> > > > > > > > > > its
> > > > > > > > > > > > > >>>> funny)
> > > > > > > > > > > > > >>>>> then
> > > > > > > > > > > > > >>>>>>> we manage and support it and that is just
> how
> > > it
> > > > is
> > > > > > but
> > > > > > > > if
> > > > > > > > > > > > > partitions
> > > > > > > > > > > > > >>>>> are a
> > > > > > > > > > > > > >>>>>>> good thing and having multiple consumers
> > scale
> > > in
> > > > > > > > parrelel
> > > > > > > > > > for
> > > > > > > > > > > a
> > > > > > > > > > > > > >>>> single
> > > > > > > > > > > > > >>>>>>> topic also good then we have to manage and
> > > > support
> > > > > > > that.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>>
> /*******************************************
> > > > > > > > > > > > > >>>>>>> Joe Stein
> > > > > > > > > > > > > >>>>>>> Founder, Principal Consultant
> > > > > > > > > > > > > >>>>>>> Big Data Open Source Security LLC
> > > > > > > > > > > > > >>>>>>> http://www.stealth.ly
> > > > > > > > > > > > > >>>>>>> Twitter: @allthingshadoop <
> > > > > > > > > > > > http://www.twitter.com/allthingshadoop>
> > > > > > > > > > > > > >>>>>>>
> ********************************************/
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Reply via email to