BewareMyPower commented on issue #24464:
URL: https://github.com/apache/pulsar/issues/24464#issuecomment-3097212425

   Yes, this issue can be resolved by creating a short buffer rather than 
modifying the original entry's buffer like 
https://github.com/streamnative/kop/blob/14f9832d8a437c5b717e3c74fd7e4cda9810aff7/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/AbstractEntryFormatter.java#L70
   
   In our internal fork (KSN), it's fixed by adding a buffer list abstraction:
   
   ```java
       private final List<ByteBuf> bufferList;
   
       public void addLong(long value) {
           final var buffer = ALLOCATOR.buffer(8);
           bufferList.add(buffer);
           buffer.writeLong(value);
       }
   
       public void addSlice(ByteBuf buffer, int startOffset) {
           bufferList.add(buffer.retainedSlice(buffer.readerIndex() + 
startOffset, buffer.readableBytes() - startOffset));
       }
   ```
   
   and merge the buffers added in `addLong` and `addSlice`.
   
   ```java
   kafkaBuffers.addLong(startOffset);
   kafkaBuffers.addSlice(byteBuf, OFFSET_LENGTH);
   final var batchedByteBuf = kafkaBuffers.merge(); // the merged buffer
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to