This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e8d0d9850b KAFKA-14853 the serializer/deserialize which extends
ClusterResourceListener is not added to Metadata (#13460)
6e8d0d9850b is described below
commit 6e8d0d9850b05fc1de0ceaf77834e68939f782c1
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Wed Mar 29 16:02:04 2023 +0800
KAFKA-14853 the serializer/deserialize which extends
ClusterResourceListener is not added to Metadata (#13460)
Reviewers: dengziming <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../kafka/clients/producer/KafkaProducer.java | 4 +-
.../integration/kafka/api/BaseConsumerTest.scala | 47 +++++++++++++++++++++-
3 files changed, 49 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0088fbb5417..8576c6e052a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -727,8 +727,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
OffsetResetStrategy offsetResetStrategy =
OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(logContext,
offsetResetStrategy);
- ClusterResourceListeners clusterResourceListeners =
configureClusterResourceListeners(keyDeserializer,
- valueDeserializer, metrics.reporters(), interceptorList);
+ ClusterResourceListeners clusterResourceListeners =
configureClusterResourceListeners(this.keyDeserializer,
+ this.valueDeserializer, metrics.reporters(),
interceptorList);
this.metadata = new ConsumerMetadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 56ec6b0df52..9eb252b7086 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -401,8 +401,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.interceptors = interceptors;
else
this.interceptors = new
ProducerInterceptors<>(interceptorList);
- ClusterResourceListeners clusterResourceListeners =
configureClusterResourceListeners(keySerializer,
- valueSerializer, interceptorList, reporters);
+ ClusterResourceListeners clusterResourceListeners =
configureClusterResourceListeners(this.keySerializer,
+ this.valueSerializer, interceptorList, reporters);
this.maxRequestSize =
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize =
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType =
CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index fe560405033..08a3f32fe1a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -16,12 +16,16 @@
*/
package kafka.api
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.PartitionInfo
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener,
PartitionInfo}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -49,6 +53,27 @@ abstract class BaseConsumerTest extends AbstractConsumerTest
{
sendAndAwaitAsyncCommit(consumer)
}
+ @Test
+ def testClusterResourceListener(): Unit = {
+ val numRecords = 100
+ val producerProps = new Properties()
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[BaseConsumerTest.TestClusterResourceListenerSerializer])
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[BaseConsumerTest.TestClusterResourceListenerSerializer])
+
+ val producer: KafkaProducer[Array[Byte], Array[Byte]] =
createProducer(keySerializer = null, valueSerializer = null, producerProps)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
+
+ val consumerProps = new Properties()
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
classOf[BaseConsumerTest.TestClusterResourceListenerDeserializer])
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
classOf[BaseConsumerTest.TestClusterResourceListenerDeserializer])
+ val consumer: KafkaConsumer[Array[Byte], Array[Byte]] =
createConsumer(keyDeserializer = null, valueDeserializer = null, consumerProps)
+ consumer.subscribe(List(tp.topic()).asJava)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingTimestamp = startingTimestamp)
+ assertNotEquals(0, BaseConsumerTest.updateProducerCount.get())
+ assertNotEquals(0, BaseConsumerTest.updateConsumerCount.get())
+ }
+
@Test
def testCoordinatorFailover(): Unit = {
val listener = new TestConsumerReassignmentListener()
@@ -79,3 +104,21 @@ abstract class BaseConsumerTest extends
AbstractConsumerTest {
ensureNoRebalance(consumer, listener)
}
}
+
+object BaseConsumerTest {
+ val updateProducerCount = new AtomicInteger()
+ val updateConsumerCount = new AtomicInteger()
+
+ class TestClusterResourceListenerSerializer extends Serializer[Array[Byte]]
with ClusterResourceListener {
+
+ override def onUpdate(clusterResource: ClusterResource): Unit =
updateProducerCount.incrementAndGet();
+
+ override def serialize(topic: String, data: Array[Byte]): Array[Byte] =
data
+ }
+
+ class TestClusterResourceListenerDeserializer extends
Deserializer[Array[Byte]] with ClusterResourceListener {
+
+ override def onUpdate(clusterResource: ClusterResource): Unit =
updateConsumerCount.incrementAndGet();
+ override def deserialize(topic: String, data: Array[Byte]): Array[Byte] =
data
+ }
+}