This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 497f5004830 KAFKA-18183 replace BytesSerializer with 
ByteArraySerializer for producer/consumer (#18113)
497f5004830 is described below

commit 497f50048305733606769a688b940cadce98521f
Author: Logan Zhu <[email protected]>
AuthorDate: Sat Dec 14 01:42:32 2024 +0800

    KAFKA-18183 replace BytesSerializer with ByteArraySerializer for 
producer/consumer (#18113)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/common/test/api/ClusterInstance.java   | 12 ++++++------
 .../common/test/api/ClusterTestExtensionsTest.java      | 17 +++++++++--------
 2 files changed, 15 insertions(+), 14 deletions(-)

diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index b64e6d1a000..b6954e44923 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -38,8 +38,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.test.TestUtils;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.fault.FaultHandlerException;
@@ -159,8 +159,8 @@ public interface ClusterInstance {
 
     default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
         Map<String, Object> props = new HashMap<>(configs);
-        props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
BytesSerializer.class.getName());
-        props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
BytesSerializer.class.getName());
+        props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
         return new KafkaProducer<>(props);
     }
@@ -171,8 +171,8 @@ public interface ClusterInstance {
 
     default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
         Map<String, Object> props = new HashMap<>(configs);
-        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
-        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + 
TestUtils.randomString(5));
         props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
diff --git 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index af7a39a92be..2a08d4e58eb 100644
--- 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++ 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -32,13 +32,13 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.test.TestUtils;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -61,6 +61,7 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CL
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -301,11 +302,11 @@ public class ClusterTestExtensionsTest {
     })
     public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster) 
throws InterruptedException {
         String topic = "topic";
-        Bytes key = Bytes.wrap("key".getBytes());
-        Bytes value = Bytes.wrap("value".getBytes());
+        byte[] key = "key".getBytes(StandardCharsets.UTF_8);
+        byte[] value = "value".getBytes(StandardCharsets.UTF_8);
         try (Admin adminClient = cluster.admin();
-             Producer<Bytes, Bytes> producer = cluster.producer();
-             Consumer<Bytes, Bytes> consumer = cluster.consumer()
+             Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = cluster.consumer()
         ) {
             adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 
1)));
             assertNotNull(producer);
@@ -313,13 +314,13 @@ public class ClusterTestExtensionsTest {
             producer.send(new ProducerRecord<>(topic, key, value));
             producer.flush();
             consumer.subscribe(singletonList(topic));
-            List<ConsumerRecord<Bytes, Bytes>> records = new ArrayList<>();
+            List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
             TestUtils.waitForCondition(() -> {
                 consumer.poll(Duration.ofMillis(100)).forEach(records::add);
                 return records.size() == 1;
             }, "Failed to receive message");
-            assertEquals(key, records.get(0).key());
-            assertEquals(value, records.get(0).value());
+            assertArrayEquals(key, records.get(0).key());
+            assertArrayEquals(value, records.get(0).value());
         }
     }
 

Reply via email to