divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1521852820

   This is ready for review. A summary of the changes is provided below.
   
   **On the server:**
   1. This PR starts using buffer pools to allocate intermediate buffer which 
is used by the stream that converts compressed to uncompressed data. This is 
achieved by using new `ChunkedBytesStream` instead of `BufferedInputStream` for 
ZSTD & GZIP. For LZ4 and SNAPPY, which weren't using `BufferedInputStream`, 
this is a no op (see point for 2 for changes to them). The impact of allocation 
on Zstd can be observed from the 
[before](https://issues.apache.org/jira/secure/attachment/13057480/flamegraph-trunk-heapalloc-before.html)
 & 
[after](https://issues.apache.org/jira/secure/attachment/13057479/flamegraph-pr-heapalloc-after.html)
 object allocation flamegraph linked to the 
[JIRA](https://issues.apache.org/jira/browse/KAFKA-14633). Please observe how 
in the *after* flamegraph, the contribution of allocation by 
`validateMessagesAndAssignOffsets` and decreased drastically from 39% to 5%.
   2. This PR reduces the number of buffer pools used during decompression from 
2 to 1. Earlier we created a "skip buffer" of size 2KB for ALL compression 
algorithms and another intermediate buffer created by `BufferedInputStream` for 
some of the compression algorithms (ZSTD & GZIP). This PR uses the same 
intermediate buffer for ZSTD & GZIP, hence reducing the number of allocations 
to 1 (instead of 2). For LZ4 and SNAPPY, the number of allocations remain same 
but the 2KB skip buffer is allocated from the buffer pool now.
   3. The skip implementation for some compression algorithms allocated new 
buffers. As an example, skip implementation of ZSTD-JNI allocates new buffer of 
different size (from buffer pool) on every skip invocation. This PR uses the 
intermediate buffer to perform skip instead of pushing it to down to ZSTD-JNI. 
   
   The impact of the above two changes on throughput is observed by 
`RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize`. You 
will notice 20-70% improvement there. (see attached benchmark sheet in 
description)
   
   **On the consumer:**
   The change 1 remains same for consumer and changes 2 & 3 does not impact 
consumer since it doesn't use a "skip" iterator.
   
   The impact of the above two changes on consumer  throughput is observed by 
`RecordBatchIterationBenchmark. measureStreamingIteratorForVariableBatchSize` 
(note that this a different benchmark that was specified for server, this one 
doesn't use skipIterator). You will notice mix bag of single digit regression 
for some compression type to 10-50% improvement for Zstd. The reason that we 
don't see equivalent gains in consumer is because it copies all uncompressed 
data in a single buffer and then reads off it. We have not reduced any buffer 
allocation for consumer scenario(since change 2 & 3 aren't applicable to 
consumers). There are other optimizations that we can perform for consumer 
listed below but they are out of scope for this PR.
   
   **Future optimisations (out of scope of this PR)**
   1. For non-skip iterators (used by consumers), we currently allocate 
intermediate buffer for decompression and then allocate another buffer for 
storing key & value. The flow looks like: uncompressed data => intermediate 
buffer => inputStream => recordByteBuffer. This can be improved to uncompressed 
data => recordByteBuffer, and hence, we would allocate only 1 byte buffer.
   2. We need to revisit whether we require a skipBuffer for LZ4 and SNAPPY. In 
the current PR, we wanted to maintain parity with legacy implementation, hence 
a 2KB intermediate buffer in ChunkedBytesStream is used for them but it could 
potentially be removed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to