[
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-527:
--------------------------------
Attachment: KAFKA-527.message-copy.history
Attached a file summarizing the copying operations. If we count the shallow
iterating over message sets then there are about 8 copy operations throughout
the life time of a message. Among them Copy 3/4/5 could be avoided, and we
could only apply one shallow iteration on each broker (leader and follower) and
consumer.
Given that the offset assignment is currently the main reason for converting
between array of Message and ByteBufferMessageSet instead of appending Message
to ByteBufferMessageSet, and that trimming invalid bytes on consumer
complicates its logic, one proposal to refactoring the compression while still
be backward compatible is:
1) On serving fetch request, only returns complete Messages within the request
rate so the consumer would not need to trim invalid bytes.
2) Use the first 6 bits of the attribute byte of Message to indicate the number
of real messages it wraps. This will limit the number of compressed messages in
one wrapper to be 256, which I think is good enough.
3) When doing dedup, only delete the payload of the message but keep its
counter so that the offsets are still consecutively incremental.
4) For offset assignment on the broker, the ByteBufferMessageSet will have
multiple wrapper Messages (since we have limit compress size to 256 one
ProducerRequest now may have ByteBufferMessageSet of multiple wrapper
messages); we need to use the shallowIterator to iterate them and reassign
offsets without decompress/recompress.
5) On the consumer side, allow shallow iteration as well as deep iteration;
with deep iteration, the consumer iterator needs to assign offset to each
decompressed message based on its wrapper messages offset and size. Also add
the compression codec to the MessageAndMetadata returned to the consumers,
which will then solve KAFKA-1011.
With this we can completely remove the de/re-compression on brokers, and always
use the compressed wrapper message unit throughout the pipeline. However cons
of this approach is that it complicates logic on producer (256 compression
batch limit) and consumer (re-compute message offsets).
> Compression support does numerous byte copies
> ---------------------------------------------
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
> Issue Type: Bug
> Reporter: Jay Kreps
> Attachments: java.hprof.no-compression.txt, java.hprof.snappy.text,
> KAFKA-527.message-copy.history
>
>
> The data path for compressing or decompressing messages is extremely
> inefficient. We do something like 7 (?) complete copies of the data, often
> for simple things like adding a 4 byte size to the front. I am not sure how
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the
> Message/MessageSet interfaces which are based on byte buffers is the cause of
> many of these.
--
This message was sent by Atlassian JIRA
(v6.1#6144)