I have updated the KIP based on the latest discussion. Please check and let
me know if there is any further concern.

Thanks,

Jiangjie (Becket) Qin

On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin <becket....@gmail.com> wrote:

> Actually second thought on this, rate might be better for two reasons:
> 1. Most of the metrics in the producer we already have are using rate
> instead of count.
> 2. If a service is bounced, the count will be reset to 0, but it does not
> affect rate.
>
> I'll make the change.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Dong,
>>
>> Yes, there is a sensor in the patch about the split occurrence.
>>
>> Currently it is a count instead of rate. In practice, it seems count is
>> easier to use in this case. But I am open to change.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>>> Hey Becket,
>>>
>>> I haven't looked at the patch yet. But since we are going to try the
>>> split-on-oversize solution, should the KIP also add a sensor that shows
>>> the
>>> rate of split per second and the probability of split?
>>>
>>> Thanks,
>>> Dong
>>>
>>>
>>> On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin <becket....@gmail.com> wrote:
>>>
>>> > Just to clarify, the implementation is basically what I mentioned above
>>> > (split/resend + adjusted estimation evolving algorithm) and changing
>>> the
>>> > compression ratio estimation to be per topic.
>>> >
>>> > Thanks,
>>> >
>>> > Jiangjie (Becket) Qin
>>> >
>>> > On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin <becket....@gmail.com>
>>> wrote:
>>> >
>>> > > I went ahead and have a patch submitted here:
>>> > > https://github.com/apache/kafka/pull/2638
>>> > >
>>> > > Per Joel's suggestion, I changed the compression ratio to be per
>>> topic as
>>> > > well. It seems working well. Since there is an important behavior
>>> change
>>> > > and a new sensor is added, I'll keep the KIP and update it according.
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Jiangjie (Becket) Qin
>>> > >
>>> > > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <jjkosh...@gmail.com>
>>> wrote:
>>> > >
>>> > >> >
>>> > >> > Lets say we sent the batch over the wire and received a
>>> > >> > RecordTooLargeException, how do we split it as once we add the
>>> message
>>> > >> to
>>> > >> > the batch we loose the message level granularity. We will have to
>>> > >> > decompress, do deep iteration and split and again compress. right?
>>> > This
>>> > >> > looks like a performance bottle neck in case of multi topic
>>> producers
>>> > >> like
>>> > >> > mirror maker.
>>> > >> >
>>> > >>
>>> > >> Yes, but these should be outliers if we do estimation on a per-topic
>>> > basis
>>> > >> and if we target a conservative-enough compression ratio. The
>>> producer
>>> > >> should also avoid sending over the wire if it can be made aware of
>>> the
>>> > >> max-message size limit on the broker, and split if it determines
>>> that a
>>> > >> record exceeds the broker's config. Ideally this should be part of
>>> topic
>>> > >> metadata but is not - so it could be off a periodic describe-configs
>>> > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+
>>> > >> Command+line+and+centralized+administrative+operations#KIP-
>>> > >> 4-Commandlineandcentralizedadministrativeoperations-Describe
>>> > >> ConfigsRequest>
>>> > >> (which isn't available yet). This doesn't remove the need to split
>>> and
>>> > >> recompress though.
>>> > >>
>>> > >>
>>> > >> > On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <
>>> becket....@gmail.com>
>>> > >> wrote:
>>> > >> >
>>> > >> > > Hey Mayuresh,
>>> > >> > >
>>> > >> > > 1) The batch would be split when an RecordTooLargeException is
>>> > >> received.
>>> > >> > > 2) Not lower the actual compression ratio, but lower the
>>> estimated
>>> > >> > > compression ratio "according to" the Actual Compression
>>> Ratio(ACR).
>>> > >> > >
>>> > >> > > An example, let's start with Estimated Compression Ratio (ECR) =
>>> > 1.0.
>>> > >> Say
>>> > >> > > the compression ratio of ACR is ~0.8, instead of letting the ECR
>>> > >> dropped
>>> > >> > to
>>> > >> > > 0.8 very quickly, we only drop 0.001 every time when ACR < ECR.
>>> > >> However,
>>> > >> > > once we see an ACR > ECR, we increment ECR by 0.05. If a
>>> > >> > > RecordTooLargeException is received, we reset the ECR back to
>>> 1.0
>>> > and
>>> > >> > split
>>> > >> > > the batch.
>>> > >> > >
>>> > >> > > Thanks,
>>> > >> > >
>>> > >> > > Jiangjie (Becket) Qin
>>> > >> > >
>>> > >> > >
>>> > >> > >
>>> > >> > > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat <
>>> > >> > > gharatmayures...@gmail.com> wrote:
>>> > >> > >
>>> > >> > > > Hi Becket,
>>> > >> > > >
>>> > >> > > > Seems like an interesting idea.
>>> > >> > > > I had couple of questions :
>>> > >> > > > 1) How do we decide when the batch should be split?
>>> > >> > > > 2) What do you mean by slowly lowering the "actual"
>>> compression
>>> > >> ratio?
>>> > >> > > > An example would really help here.
>>> > >> > > >
>>> > >> > > > Thanks,
>>> > >> > > >
>>> > >> > > > Mayuresh
>>> > >> > > >
>>> > >> > > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin <
>>> becket....@gmail.com
>>> > >
>>> > >> > > wrote:
>>> > >> > > >
>>> > >> > > > > Hi Jay,
>>> > >> > > > >
>>> > >> > > > > Yeah, I got your point.
>>> > >> > > > >
>>> > >> > > > > I think there might be a solution which do not require
>>> adding a
>>> > >> new
>>> > >> > > > > configuration. We can start from a very conservative
>>> compression
>>> > >> > ratio
>>> > >> > > > say
>>> > >> > > > > 1.0 and lower it very slowly according to the actual
>>> compression
>>> > >> > ratio
>>> > >> > > > > until we hit a point that we have to split a batch. At that
>>> > >> point, we
>>> > >> > > > > exponentially back off on the compression ratio. The idea is
>>> > >> somewhat
>>> > >> > > > like
>>> > >> > > > > TCP. This should help avoid frequent split.
>>> > >> > > > >
>>> > >> > > > > The upper bound of the batch size is also a little awkward
>>> today
>>> > >> > > because
>>> > >> > > > we
>>> > >> > > > > say the batch size is based on compressed size, but users
>>> cannot
>>> > >> set
>>> > >> > it
>>> > >> > > > to
>>> > >> > > > > the max message size because that will result in oversized
>>> > >> messages.
>>> > >> > > With
>>> > >> > > > > this change we will be able to allow the users to set the
>>> > message
>>> > >> > size
>>> > >> > > to
>>> > >> > > > > close to max message size.
>>> > >> > > > >
>>> > >> > > > > However the downside is that there could be latency spikes
>>> in
>>> > the
>>> > >> > > system
>>> > >> > > > in
>>> > >> > > > > this case due to the splitting, especially when there are
>>> many
>>> > >> > messages
>>> > >> > > > > need to be split at the same time. That could potentially
>>> be an
>>> > >> issue
>>> > >> > > for
>>> > >> > > > > some users.
>>> > >> > > > >
>>> > >> > > > > What do you think about this approach?
>>> > >> > > > >
>>> > >> > > > > Thanks,
>>> > >> > > > >
>>> > >> > > > > Jiangjie (Becket) Qin
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps <
>>> j...@confluent.io>
>>> > >> wrote:
>>> > >> > > > >
>>> > >> > > > > > Hey Becket,
>>> > >> > > > > >
>>> > >> > > > > > Yeah that makes sense.
>>> > >> > > > > >
>>> > >> > > > > > I agree that you'd really have to both fix the estimation
>>> > (i.e.
>>> > >> > make
>>> > >> > > it
>>> > >> > > > > per
>>> > >> > > > > > topic or make it better estimate the high percentiles) AND
>>> > have
>>> > >> the
>>> > >> > > > > > recovery mechanism. If you are underestimating often and
>>> then
>>> > >> > paying
>>> > >> > > a
>>> > >> > > > > high
>>> > >> > > > > > recovery price that won't fly.
>>> > >> > > > > >
>>> > >> > > > > > I think you take my main point though, which is just that
>>> I
>>> > >> hate to
>>> > >> > > > > exposes
>>> > >> > > > > > these super low level options to users because it is so
>>> hard
>>> > to
>>> > >> > > explain
>>> > >> > > > > to
>>> > >> > > > > > people what it means and how they should set it. So if it
>>> is
>>> > >> > possible
>>> > >> > > > to
>>> > >> > > > > > make either some combination of better estimation and
>>> > splitting
>>> > >> or
>>> > >> > > > better
>>> > >> > > > > > tolerance of overage that would be preferrable.
>>> > >> > > > > >
>>> > >> > > > > > -Jay
>>> > >> > > > > >
>>> > >> > > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin <
>>> > >> becket....@gmail.com
>>> > >> > >
>>> > >> > > > > wrote:
>>> > >> > > > > >
>>> > >> > > > > > > @Dong,
>>> > >> > > > > > >
>>> > >> > > > > > > Thanks for the comments. The default behavior of the
>>> > producer
>>> > >> > won't
>>> > >> > > > > > change.
>>> > >> > > > > > > If the users want to use the uncompressed message size,
>>> they
>>> > >> > > probably
>>> > >> > > > > > will
>>> > >> > > > > > > also bump up the batch size to somewhere close to the
>>> max
>>> > >> message
>>> > >> > > > size.
>>> > >> > > > > > > This would be in the document. BTW the default batch
>>> size is
>>> > >> 16K
>>> > >> > > > which
>>> > >> > > > > is
>>> > >> > > > > > > pretty small.
>>> > >> > > > > > >
>>> > >> > > > > > > @Jay,
>>> > >> > > > > > >
>>> > >> > > > > > > Yeah, we actually had debated quite a bit internally
>>> what is
>>> > >> the
>>> > >> > > best
>>> > >> > > > > > > solution to this.
>>> > >> > > > > > >
>>> > >> > > > > > > I completely agree it is a bug. In practice we usually
>>> leave
>>> > >> some
>>> > >> > > > > > headroom
>>> > >> > > > > > > to allow the compressed size to grow a little if the the
>>> > >> original
>>> > >> > > > > > messages
>>> > >> > > > > > > are not compressible, for example, 1000 KB instead of
>>> > exactly
>>> > >> 1
>>> > >> > MB.
>>> > >> > > > It
>>> > >> > > > > is
>>> > >> > > > > > > likely safe enough.
>>> > >> > > > > > >
>>> > >> > > > > > > The major concern for the rejected alternative is
>>> > >> performance. It
>>> > >> > > > > largely
>>> > >> > > > > > > depends on how frequent we need to split a batch, i.e.
>>> how
>>> > >> likely
>>> > >> > > the
>>> > >> > > > > > > estimation can go off. If we only need to the split work
>>> > >> > > > occasionally,
>>> > >> > > > > > the
>>> > >> > > > > > > cost would be amortized so we don't need to worry about
>>> it
>>> > too
>>> > >> > > much.
>>> > >> > > > > > > However, it looks that for a producer with shared
>>> topics,
>>> > the
>>> > >> > > > > estimation
>>> > >> > > > > > is
>>> > >> > > > > > > always off. As an example, consider two topics, one with
>>> > >> > > compression
>>> > >> > > > > > ratio
>>> > >> > > > > > > 0.6 the other 0.2, assuming exactly same traffic, the
>>> > average
>>> > >> > > > > compression
>>> > >> > > > > > > ratio would be roughly 0.4, which is not right for
>>> either of
>>> > >> the
>>> > >> > > > > topics.
>>> > >> > > > > > So
>>> > >> > > > > > > almost half of the batches (of the topics with 0.6
>>> > compression
>>> > >> > > ratio)
>>> > >> > > > > > will
>>> > >> > > > > > > end up larger than the configured batch size. When it
>>> comes
>>> > to
>>> > >> > more
>>> > >> > > > > > topics
>>> > >> > > > > > > such as mirror maker, this becomes more unpredictable.
>>> To
>>> > >> avoid
>>> > >> > > > > frequent
>>> > >> > > > > > > rejection / split of the batches, we need to configured
>>> the
>>> > >> batch
>>> > >> > > > size
>>> > >> > > > > > > pretty conservatively. This could actually hurt the
>>> > >> performance
>>> > >> > > > because
>>> > >> > > > > > we
>>> > >> > > > > > > are shoehorn the messages that are highly compressible
>>> to a
>>> > >> small
>>> > >> > > > batch
>>> > >> > > > > > so
>>> > >> > > > > > > that the other topics that are not that compressible
>>> will
>>> > not
>>> > >> > > become
>>> > >> > > > > too
>>> > >> > > > > > > large with the same batch size. At LinkedIn, our batch
>>> size
>>> > is
>>> > >> > > > > configured
>>> > >> > > > > > > to 64 KB because of this. I think we may actually have
>>> > better
>>> > >> > > > batching
>>> > >> > > > > if
>>> > >> > > > > > > we just use the uncompressed message size and 800 KB
>>> batch
>>> > >> size.
>>> > >> > > > > > >
>>> > >> > > > > > > We did not think about loosening the message size
>>> > restriction,
>>> > >> > but
>>> > >> > > > that
>>> > >> > > > > > > sounds a viable solution given that the consumer now can
>>> > fetch
>>> > >> > > > > oversized
>>> > >> > > > > > > messages. One concern would be that on the broker side
>>> > >> oversized
>>> > >> > > > > messages
>>> > >> > > > > > > will bring more memory pressure. With KIP-92, we may
>>> > mitigate
>>> > >> > that,
>>> > >> > > > but
>>> > >> > > > > > the
>>> > >> > > > > > > memory allocation for large messages may not be very GC
>>> > >> > friendly. I
>>> > >> > > > > need
>>> > >> > > > > > to
>>> > >> > > > > > > think about this a little more.
>>> > >> > > > > > >
>>> > >> > > > > > > Thanks,
>>> > >> > > > > > >
>>> > >> > > > > > > Jiangjie (Becket) Qin
>>> > >> > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > > > On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <
>>> > j...@confluent.io>
>>> > >> > > wrote:
>>> > >> > > > > > >
>>> > >> > > > > > > > Hey Becket,
>>> > >> > > > > > > >
>>> > >> > > > > > > > I get the problem we want to solve with this, but I
>>> don't
>>> > >> think
>>> > >> > > > this
>>> > >> > > > > is
>>> > >> > > > > > > > something that makes sense as a user controlled knob
>>> that
>>> > >> > > everyone
>>> > >> > > > > > > sending
>>> > >> > > > > > > > data to kafka has to think about. It is basically a
>>> bug,
>>> > >> right?
>>> > >> > > > > > > >
>>> > >> > > > > > > > First, as a technical question is it true that using
>>> the
>>> > >> > > > uncompressed
>>> > >> > > > > > > size
>>> > >> > > > > > > > for batching actually guarantees that you observe the
>>> > >> limit? I
>>> > >> > > > think
>>> > >> > > > > > that
>>> > >> > > > > > > > implies that compression always makes the messages
>>> > smaller,
>>> > >> > > which i
>>> > >> > > > > > think
>>> > >> > > > > > > > usually true but is not guaranteed, right? e.g. if
>>> someone
>>> > >> > > encrypts
>>> > >> > > > > > their
>>> > >> > > > > > > > data which tends to randomize it and then enables
>>> > >> > compressesion,
>>> > >> > > it
>>> > >> > > > > > could
>>> > >> > > > > > > > slightly get bigger?
>>> > >> > > > > > > >
>>> > >> > > > > > > > I also wonder if the rejected alternatives you
>>> describe
>>> > >> > couldn't
>>> > >> > > be
>>> > >> > > > > > made
>>> > >> > > > > > > to
>>> > >> > > > > > > > work: basically try to be a bit better at estimation
>>> and
>>> > >> > recover
>>> > >> > > > when
>>> > >> > > > > > we
>>> > >> > > > > > > > guess wrong. I don't think the memory usage should be
>>> a
>>> > >> > problem:
>>> > >> > > > > isn't
>>> > >> > > > > > it
>>> > >> > > > > > > > the same memory usage the consumer of that topic would
>>> > need?
>>> > >> > And
>>> > >> > > > > can't
>>> > >> > > > > > > you
>>> > >> > > > > > > > do the splitting and recompression in a streaming
>>> fashion?
>>> > >> If
>>> > >> > we
>>> > >> > > an
>>> > >> > > > > > make
>>> > >> > > > > > > > the estimation rate low and the recovery cost is just
>>> ~2x
>>> > >> the
>>> > >> > > > normal
>>> > >> > > > > > cost
>>> > >> > > > > > > > for that batch that should be totally fine, right?
>>> (It's
>>> > >> > > > technically
>>> > >> > > > > > true
>>> > >> > > > > > > > you might have to split more than once, but since you
>>> > halve
>>> > >> it
>>> > >> > > each
>>> > >> > > > > > time
>>> > >> > > > > > > I
>>> > >> > > > > > > > think should you get a number of halvings that is
>>> > >> logarithmic
>>> > >> > in
>>> > >> > > > the
>>> > >> > > > > > miss
>>> > >> > > > > > > > size, which, with better estimation you'd hope would
>>> be
>>> > >> super
>>> > >> > > duper
>>> > >> > > > > > > small).
>>> > >> > > > > > > >
>>> > >> > > > > > > > Alternatively maybe we could work on the other side
>>> of the
>>> > >> > > problem
>>> > >> > > > > and
>>> > >> > > > > > > try
>>> > >> > > > > > > > to make it so that a small miss on message size isn't
>>> a
>>> > big
>>> > >> > > > problem.
>>> > >> > > > > I
>>> > >> > > > > > > > think original issue was that max size and fetch size
>>> were
>>> > >> > > tightly
>>> > >> > > > > > > coupled
>>> > >> > > > > > > > and the way memory in the consumer worked you really
>>> > wanted
>>> > >> > fetch
>>> > >> > > > > size
>>> > >> > > > > > to
>>> > >> > > > > > > > be as small as possible because you'd use that much
>>> memory
>>> > >> per
>>> > >> > > > > fetched
>>> > >> > > > > > > > partition and the consumer would get stuck if its
>>> fetch
>>> > size
>>> > >> > > wasn't
>>> > >> > > > > big
>>> > >> > > > > > > > enough. I think we made some progress on that issue
>>> and
>>> > >> maybe
>>> > >> > > more
>>> > >> > > > > > could
>>> > >> > > > > > > be
>>> > >> > > > > > > > done there so that a small bit of fuzziness around the
>>> > size
>>> > >> > would
>>> > >> > > > not
>>> > >> > > > > > be
>>> > >> > > > > > > an
>>> > >> > > > > > > > issue?
>>> > >> > > > > > > >
>>> > >> > > > > > > > -Jay
>>> > >> > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > > > On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin <
>>> > >> > > becket....@gmail.com
>>> > >> > > > >
>>> > >> > > > > > > wrote:
>>> > >> > > > > > > >
>>> > >> > > > > > > > > Hi folks,
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > I would like to start the discussion thread on
>>> KIP-126.
>>> > >> The
>>> > >> > KIP
>>> > >> > > > > > propose
>>> > >> > > > > > > > > adding a new configuration to KafkaProducer to allow
>>> > >> batching
>>> > >> > > > based
>>> > >> > > > > > on
>>> > >> > > > > > > > > uncompressed message size.
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > Comments are welcome.
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > The KIP wiki is following:
>>> > >> > > > > > > > > https://cwiki.apache.org/confl
>>> uence/display/KAFKA/KIP-
>>> > >> > > > > > > > > 126+-+Allow+KafkaProducer+to+b
>>> > >> atch+based+on+uncompressed+siz
>>> > >> > e
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > Thanks,
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > Jiangjie (Becket) Qin
>>> > >> > > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > > >
>>> > >> > > >
>>> > >> > > > --
>>> > >> > > > -Regards,
>>> > >> > > > Mayuresh R. Gharat
>>> > >> > > > (862) 250-7125
>>> > >> > > >
>>> > >> > >
>>> > >> >
>>> > >> >
>>> > >> >
>>> > >> > --
>>> > >> > -Regards,
>>> > >> > Mayuresh R. Gharat
>>> > >> > (862) 250-7125
>>> > >> >
>>> > >>
>>> > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to