This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 86d4022d488431f71e121609ae83f1672fd989a9 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Mon Oct 30 13:14:15 2023 -0700 KAFKA-15602: revert KAFKA-4852 (#14617) This PR reverts - https://github.com/apache/kafka/commit/51dbd175b08e78aeca03d6752847aa5f23c98659 - https://github.com/apache/kafka/commit/496ae054c2d43c0905167745bfb2f4a0725e9fc2 Reviewers: Philip Nee <p...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../common/serialization/ByteBufferSerializer.java | 34 ++++++++++------------ .../common/serialization/SerializationTest.java | 17 ----------- 2 files changed, 15 insertions(+), 36 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index 06b66a62cb0..43b3d6e38a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -16,38 +16,34 @@ */ package org.apache.kafka.common.serialization; -import org.apache.kafka.common.utils.Utils; - import java.nio.ByteBuffer; /** - * Do not need to flip before call <i>serialize(String, ByteBuffer)</i>. For example: - * - * <blockquote> - * <pre> - * ByteBufferSerializer serializer = ...; // Create Serializer - * ByteBuffer buffer = ...; // Allocate ByteBuffer - * buffer.put(data); // Put data into buffer, do not need to flip - * serializer.serialize(topic, buffer); // Serialize buffer - * </pre> - * </blockquote> + * {@code ByteBufferSerializer} always {@link ByteBuffer#rewind() rewinds} the position of the input buffer to zero for + * serialization. A manual rewind is not necessary. + * <p> + * Note: any existing buffer position is ignored. + * <p> + * The position is also rewound back to zero before {@link #serialize(String, ByteBuffer)} + * returns. */ public class ByteBufferSerializer implements Serializer<ByteBuffer> { - - @Override public byte[] serialize(String topic, ByteBuffer data) { - if (data == null) { + if (data == null) return null; - } + + data.rewind(); if (data.hasArray()) { - final byte[] arr = data.array(); + byte[] arr = data.array(); if (data.arrayOffset() == 0 && arr.length == data.remaining()) { return arr; } } - data.flip(); - return Utils.toArray(data); + byte[] ret = new byte[data.remaining()]; + data.get(ret, 0, ret.length); + data.rewind(); + return ret; } } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 6607678dedf..cd576f842c3 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -375,23 +375,6 @@ public class SerializationTest { return Serdes.serdeFrom(serializer, deserializer); } - @Test - public void testByteBufferSerializer() { - final byte[] bytes = "Hello".getBytes(UTF_8); - final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes); - final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes); - final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes); - final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes); - final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes); - try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) { - assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0)); - assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1)); - assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2)); - assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0)); - assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1)); - } - } - @ParameterizedTest @ValueSource(booleans = { true, false }) public void testBooleanSerializer(Boolean dataToSerialize) {