This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51dbd175b08 KAFKA-4852: Fix ByteBufferSerializer#serialize(String, 
ByteBuffer) not compatible with offsets (#12683)
51dbd175b08 is described below

commit 51dbd175b08e78aeca03d6752847aa5f23c98659
Author: LinShunKang <linshunkang....@gmail.com>
AuthorDate: Fri Sep 30 01:59:47 2022 +0800

    KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not 
compatible with offsets (#12683)
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../common/serialization/ByteBufferSerializer.java | 31 ++++++++++++++++------
 .../common/serialization/SerializationTest.java    | 19 +++++++++++++
 2 files changed, 42 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
index 9fb12544e0f..5987688759e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -16,25 +16,40 @@
  */
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.utils.Utils;
+
 import java.nio.ByteBuffer;
 
+/**
+ * ByteBufferSerializer will not change ByteBuffer's mark, position and limit.
+ * And do not need to flip before call <i>serialize(String, ByteBuffer)</i>. 
For example:
+ *
+ * <blockquote>
+ * <pre>
+ * ByteBufferSerializer serializer = ...; // Create Serializer
+ * ByteBuffer buffer = ...;               // Allocate ByteBuffer
+ * buffer.put(data);                      // Put data into buffer, do not need 
to flip
+ * serializer.serialize(topic, buffer);   // Serialize buffer
+ * </pre>
+ * </blockquote>
+ */
 public class ByteBufferSerializer implements Serializer<ByteBuffer> {
+
+    @Override
     public byte[] serialize(String topic, ByteBuffer data) {
-        if (data == null)
+        if (data == null) {
             return null;
-
-        data.rewind();
+        }
 
         if (data.hasArray()) {
-            byte[] arr = data.array();
+            final byte[] arr = data.array();
             if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
                 return arr;
             }
         }
 
-        byte[] ret = new byte[data.remaining()];
-        data.get(ret, 0, ret.length);
-        data.rewind();
-        return ret;
+        final ByteBuffer copyData = data.asReadOnlyBuffer();
+        copyData.flip();
+        return Utils.toArray(copyData);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 85c09dd17ae..eb1fee3943f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -31,6 +31,8 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Stack;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -368,4 +370,21 @@ public class SerializationTest {
 
         return Serdes.serdeFrom(serializer, deserializer);
     }
+
+    @Test
+    public void testByteBufferSerializer() {
+        final byte[] bytes = "Hello".getBytes(UTF_8);
+        final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 
1).put(bytes);
+        final ByteBuffer heapBuffer1 = 
ByteBuffer.allocate(bytes.length).put(bytes);
+        final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer directBuffer0 = 
ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
+        final ByteBuffer directBuffer1 = 
ByteBuffer.allocateDirect(bytes.length).put(bytes);
+        try (final ByteBufferSerializer serializer = new 
ByteBufferSerializer()) {
+            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
+            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
+            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
+            assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer0));
+            assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer1));
+        }
+    }
 }

Reply via email to