This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 497f5004830 KAFKA-18183 replace BytesSerializer with
ByteArraySerializer for producer/consumer (#18113)
497f5004830 is described below
commit 497f50048305733606769a688b940cadce98521f
Author: Logan Zhu <[email protected]>
AuthorDate: Sat Dec 14 01:42:32 2024 +0800
KAFKA-18183 replace BytesSerializer with ByteArraySerializer for
producer/consumer (#18113)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/common/test/api/ClusterInstance.java | 12 ++++++------
.../common/test/api/ClusterTestExtensionsTest.java | 17 +++++++++--------
2 files changed, 15 insertions(+), 14 deletions(-)
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index b64e6d1a000..b6954e44923 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -38,8 +38,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.fault.FaultHandlerException;
@@ -159,8 +159,8 @@ public interface ClusterInstance {
default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
Map<String, Object> props = new HashMap<>(configs);
- props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
BytesSerializer.class.getName());
- props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
BytesSerializer.class.getName());
+ props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
return new KafkaProducer<>(props);
}
@@ -171,8 +171,8 @@ public interface ClusterInstance {
default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
Map<String, Object> props = new HashMap<>(configs);
- props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class.getName());
- props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" +
TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
diff --git
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index af7a39a92be..2a08d4e58eb 100644
---
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -32,13 +32,13 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.TestUtils;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -61,6 +61,7 @@ import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CL
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -301,11 +302,11 @@ public class ClusterTestExtensionsTest {
})
public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster)
throws InterruptedException {
String topic = "topic";
- Bytes key = Bytes.wrap("key".getBytes());
- Bytes value = Bytes.wrap("value".getBytes());
+ byte[] key = "key".getBytes(StandardCharsets.UTF_8);
+ byte[] value = "value".getBytes(StandardCharsets.UTF_8);
try (Admin adminClient = cluster.admin();
- Producer<Bytes, Bytes> producer = cluster.producer();
- Consumer<Bytes, Bytes> consumer = cluster.consumer()
+ Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer = cluster.consumer()
) {
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short)
1)));
assertNotNull(producer);
@@ -313,13 +314,13 @@ public class ClusterTestExtensionsTest {
producer.send(new ProducerRecord<>(topic, key, value));
producer.flush();
consumer.subscribe(singletonList(topic));
- List<ConsumerRecord<Bytes, Bytes>> records = new ArrayList<>();
+ List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
return records.size() == 1;
}, "Failed to receive message");
- assertEquals(key, records.get(0).key());
- assertEquals(value, records.get(0).value());
+ assertArrayEquals(key, records.get(0).key());
+ assertArrayEquals(value, records.get(0).value());
}
}