divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1083831592


##########
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##########
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
     public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
         final ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+        final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+        return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
     }
 
     private CloseableIterator<Record> compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
         final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
         if (skipKeyValue) {
             // this buffer is used to skip length delimited fields like key, 
value, headers
-            byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+            final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   I have mistakenly added that I have changed it to the description which I 
will fix.
   
   I would like to make this (skipping) a separate code change because I think 
that it is a performance regression since it has an extra buffer allocation.
   
   This is because zstd-jni implementation of skip() (de)allocates a "skip" 
buffer (from the buffer pool) for skipping [1]. Alternatively, in current 
implementation, we read all data in the same 16KB output buffer (which is 
allocated only once). In both cases, the amount of data copy from native to 
Java is same. The only difference is whether we read & skip in our code or we 
read & skip in zstd-jni code. Pushing skip to zstd-jni would be beneficial when 
it further pushed it down to native layer.
   
   [1] 
https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdInputStreamNoFinalizer.java#L228
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to