>
> Lets say we sent the batch over the wire and received a
> RecordTooLargeException, how do we split it as once we add the message to
> the batch we loose the message level granularity. We will have to
> decompress, do deep iteration and split and again compress. right? This
> looks like a performance bottle neck in case of multi topic producers like
> mirror maker.
>

Yes, but these should be outliers if we do estimation on a per-topic basis
and if we target a conservative-enough compression ratio. The producer
should also avoid sending over the wire if it can be made aware of the
max-message size limit on the broker, and split if it determines that a
record exceeds the broker's config. Ideally this should be part of topic
metadata but is not - so it could be off a periodic describe-configs
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-DescribeConfigsRequest>
(which isn't available yet). This doesn't remove the need to split and
recompress though.


> On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > Hey Mayuresh,
> >
> > 1) The batch would be split when an RecordTooLargeException is received.
> > 2) Not lower the actual compression ratio, but lower the estimated
> > compression ratio "according to" the Actual Compression Ratio(ACR).
> >
> > An example, let's start with Estimated Compression Ratio (ECR) = 1.0. Say
> > the compression ratio of ACR is ~0.8, instead of letting the ECR dropped
> to
> > 0.8 very quickly, we only drop 0.001 every time when ACR < ECR. However,
> > once we see an ACR > ECR, we increment ECR by 0.05. If a
> > RecordTooLargeException is received, we reset the ECR back to 1.0 and
> split
> > the batch.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi Becket,
> > >
> > > Seems like an interesting idea.
> > > I had couple of questions :
> > > 1) How do we decide when the batch should be split?
> > > 2) What do you mean by slowly lowering the "actual" compression ratio?
> > > An example would really help here.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin <becket....@gmail.com>
> > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > Yeah, I got your point.
> > > >
> > > > I think there might be a solution which do not require adding a new
> > > > configuration. We can start from a very conservative compression
> ratio
> > > say
> > > > 1.0 and lower it very slowly according to the actual compression
> ratio
> > > > until we hit a point that we have to split a batch. At that point, we
> > > > exponentially back off on the compression ratio. The idea is somewhat
> > > like
> > > > TCP. This should help avoid frequent split.
> > > >
> > > > The upper bound of the batch size is also a little awkward today
> > because
> > > we
> > > > say the batch size is based on compressed size, but users cannot set
> it
> > > to
> > > > the max message size because that will result in oversized messages.
> > With
> > > > this change we will be able to allow the users to set the message
> size
> > to
> > > > close to max message size.
> > > >
> > > > However the downside is that there could be latency spikes in the
> > system
> > > in
> > > > this case due to the splitting, especially when there are many
> messages
> > > > need to be split at the same time. That could potentially be an issue
> > for
> > > > some users.
> > > >
> > > > What do you think about this approach?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > Hey Becket,
> > > > >
> > > > > Yeah that makes sense.
> > > > >
> > > > > I agree that you'd really have to both fix the estimation (i.e.
> make
> > it
> > > > per
> > > > > topic or make it better estimate the high percentiles) AND have the
> > > > > recovery mechanism. If you are underestimating often and then
> paying
> > a
> > > > high
> > > > > recovery price that won't fly.
> > > > >
> > > > > I think you take my main point though, which is just that I hate to
> > > > exposes
> > > > > these super low level options to users because it is so hard to
> > explain
> > > > to
> > > > > people what it means and how they should set it. So if it is
> possible
> > > to
> > > > > make either some combination of better estimation and splitting or
> > > better
> > > > > tolerance of overage that would be preferrable.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin <becket....@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > @Dong,
> > > > > >
> > > > > > Thanks for the comments. The default behavior of the producer
> won't
> > > > > change.
> > > > > > If the users want to use the uncompressed message size, they
> > probably
> > > > > will
> > > > > > also bump up the batch size to somewhere close to the max message
> > > size.
> > > > > > This would be in the document. BTW the default batch size is 16K
> > > which
> > > > is
> > > > > > pretty small.
> > > > > >
> > > > > > @Jay,
> > > > > >
> > > > > > Yeah, we actually had debated quite a bit internally what is the
> > best
> > > > > > solution to this.
> > > > > >
> > > > > > I completely agree it is a bug. In practice we usually leave some
> > > > > headroom
> > > > > > to allow the compressed size to grow a little if the the original
> > > > > messages
> > > > > > are not compressible, for example, 1000 KB instead of exactly 1
> MB.
> > > It
> > > > is
> > > > > > likely safe enough.
> > > > > >
> > > > > > The major concern for the rejected alternative is performance. It
> > > > largely
> > > > > > depends on how frequent we need to split a batch, i.e. how likely
> > the
> > > > > > estimation can go off. If we only need to the split work
> > > occasionally,
> > > > > the
> > > > > > cost would be amortized so we don't need to worry about it too
> > much.
> > > > > > However, it looks that for a producer with shared topics, the
> > > > estimation
> > > > > is
> > > > > > always off. As an example, consider two topics, one with
> > compression
> > > > > ratio
> > > > > > 0.6 the other 0.2, assuming exactly same traffic, the average
> > > > compression
> > > > > > ratio would be roughly 0.4, which is not right for either of the
> > > > topics.
> > > > > So
> > > > > > almost half of the batches (of the topics with 0.6 compression
> > ratio)
> > > > > will
> > > > > > end up larger than the configured batch size. When it comes to
> more
> > > > > topics
> > > > > > such as mirror maker, this becomes more unpredictable. To avoid
> > > > frequent
> > > > > > rejection / split of the batches, we need to configured the batch
> > > size
> > > > > > pretty conservatively. This could actually hurt the performance
> > > because
> > > > > we
> > > > > > are shoehorn the messages that are highly compressible to a small
> > > batch
> > > > > so
> > > > > > that the other topics that are not that compressible will not
> > become
> > > > too
> > > > > > large with the same batch size. At LinkedIn, our batch size is
> > > > configured
> > > > > > to 64 KB because of this. I think we may actually have better
> > > batching
> > > > if
> > > > > > we just use the uncompressed message size and 800 KB batch size.
> > > > > >
> > > > > > We did not think about loosening the message size restriction,
> but
> > > that
> > > > > > sounds a viable solution given that the consumer now can fetch
> > > > oversized
> > > > > > messages. One concern would be that on the broker side oversized
> > > > messages
> > > > > > will bring more memory pressure. With KIP-92, we may mitigate
> that,
> > > but
> > > > > the
> > > > > > memory allocation for large messages may not be very GC
> friendly. I
> > > > need
> > > > > to
> > > > > > think about this a little more.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <j...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > Hey Becket,
> > > > > > >
> > > > > > > I get the problem we want to solve with this, but I don't think
> > > this
> > > > is
> > > > > > > something that makes sense as a user controlled knob that
> > everyone
> > > > > > sending
> > > > > > > data to kafka has to think about. It is basically a bug, right?
> > > > > > >
> > > > > > > First, as a technical question is it true that using the
> > > uncompressed
> > > > > > size
> > > > > > > for batching actually guarantees that you observe the limit? I
> > > think
> > > > > that
> > > > > > > implies that compression always makes the messages smaller,
> > which i
> > > > > think
> > > > > > > usually true but is not guaranteed, right? e.g. if someone
> > encrypts
> > > > > their
> > > > > > > data which tends to randomize it and then enables
> compressesion,
> > it
> > > > > could
> > > > > > > slightly get bigger?
> > > > > > >
> > > > > > > I also wonder if the rejected alternatives you describe
> couldn't
> > be
> > > > > made
> > > > > > to
> > > > > > > work: basically try to be a bit better at estimation and
> recover
> > > when
> > > > > we
> > > > > > > guess wrong. I don't think the memory usage should be a
> problem:
> > > > isn't
> > > > > it
> > > > > > > the same memory usage the consumer of that topic would need?
> And
> > > > can't
> > > > > > you
> > > > > > > do the splitting and recompression in a streaming fashion? If
> we
> > an
> > > > > make
> > > > > > > the estimation rate low and the recovery cost is just ~2x the
> > > normal
> > > > > cost
> > > > > > > for that batch that should be totally fine, right? (It's
> > > technically
> > > > > true
> > > > > > > you might have to split more than once, but since you halve it
> > each
> > > > > time
> > > > > > I
> > > > > > > think should you get a number of halvings that is logarithmic
> in
> > > the
> > > > > miss
> > > > > > > size, which, with better estimation you'd hope would be super
> > duper
> > > > > > small).
> > > > > > >
> > > > > > > Alternatively maybe we could work on the other side of the
> > problem
> > > > and
> > > > > > try
> > > > > > > to make it so that a small miss on message size isn't a big
> > > problem.
> > > > I
> > > > > > > think original issue was that max size and fetch size were
> > tightly
> > > > > > coupled
> > > > > > > and the way memory in the consumer worked you really wanted
> fetch
> > > > size
> > > > > to
> > > > > > > be as small as possible because you'd use that much memory per
> > > > fetched
> > > > > > > partition and the consumer would get stuck if its fetch size
> > wasn't
> > > > big
> > > > > > > enough. I think we made some progress on that issue and maybe
> > more
> > > > > could
> > > > > > be
> > > > > > > done there so that a small bit of fuzziness around the size
> would
> > > not
> > > > > be
> > > > > > an
> > > > > > > issue?
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin <
> > becket....@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi folks,
> > > > > > > >
> > > > > > > > I would like to start the discussion thread on KIP-126. The
> KIP
> > > > > propose
> > > > > > > > adding a new configuration to KafkaProducer to allow batching
> > > based
> > > > > on
> > > > > > > > uncompressed message size.
> > > > > > > >
> > > > > > > > Comments are welcome.
> > > > > > > >
> > > > > > > > The KIP wiki is following:
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > 126+-+Allow+KafkaProducer+to+batch+based+on+uncompressed+siz
> e
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Reply via email to