This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aafc1ae27e0 KAFKA-19056 Rewrite EndToEndClusterIdTest in Java and move 
it to the server module (#19741)
aafc1ae27e0 is described below

commit aafc1ae27e07cc1891db970871de296836094692
Author: Ming-Yen Chung <mingyen...@gmail.com>
AuthorDate: Thu May 29 19:08:05 2025 +0800

    KAFKA-19056 Rewrite EndToEndClusterIdTest in Java and move it to the server 
module (#19741)
    
    Use ClusterTest and java to rewrite `EndToEndClusterIdTest` and move it
    to the server module
    
    Reviewers: Ken Huang <s7133...@gmail.com>, Chia-Ping Tsai
     <chia7...@gmail.com>
---
 .../kafka/api/EndToEndClusterIdTest.scala          | 217 -------------------
 .../apache/kafka/api/EndToEndClusterIdTest.java    | 231 +++++++++++++++++++++
 2 files changed, 231 insertions(+), 217 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
deleted file mode 100644
index 3b49a7a196b..00000000000
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.api
-
-import java.util.concurrent.ExecutionException
-import java.util.concurrent.atomic.AtomicReference
-import java.util.Properties
-import kafka.integration.KafkaServerTestHarness
-import kafka.server._
-import kafka.utils._
-import kafka.utils.Implicits._
-import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, 
TopicPartition}
-import org.apache.kafka.server.metrics.MetricConfigs
-import org.apache.kafka.test.{TestUtils => _, _}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
-
-import org.apache.kafka.test.TestUtils.isValidClusterId
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-/** The test cases here verify the following conditions.
-  * 1. The ProducerInterceptor receives the cluster id after the onSend() 
method is called and before onAcknowledgement() method is called.
-  * 2. The Serializer receives the cluster id before the serialize() method is 
called.
-  * 3. The producer MetricReporter receives the cluster id after send() method 
is called on KafkaProducer.
-  * 4. The ConsumerInterceptor receives the cluster id before the onConsume() 
method.
-  * 5. The Deserializer receives the cluster id before the deserialize() 
method is called.
-  * 6. The consumer MetricReporter receives the cluster id after poll() is 
called on KafkaConsumer.
-  * 7. The broker MetricReporter receives the cluster id after the broker 
startup is over.
-  * 8. The broker KafkaMetricReporter receives the cluster id after the broker 
startup is over.
-  * 9. All the components receive the same cluster id.
-  */
-
-object EndToEndClusterIdTest {
-
-  object MockConsumerMetricsReporter {
-    val CLUSTER_META = new AtomicReference[ClusterResource]
-  }
-
-  class MockConsumerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
-
-    override def onUpdate(clusterMetadata: ClusterResource): Unit = {
-      MockConsumerMetricsReporter.CLUSTER_META.set(clusterMetadata)
-    }
-  }
-
-  object MockProducerMetricsReporter {
-    val CLUSTER_META = new AtomicReference[ClusterResource]
-  }
-
-  class MockProducerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
-
-    override def onUpdate(clusterMetadata: ClusterResource): Unit = {
-      MockProducerMetricsReporter.CLUSTER_META.set(clusterMetadata)
-    }
-  }
-
-  object MockBrokerMetricsReporter {
-    val CLUSTER_META = new AtomicReference[ClusterResource]
-  }
-
-  class MockBrokerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
-
-    override def onUpdate(clusterMetadata: ClusterResource): Unit = {
-      MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata)
-    }
-  }
-}
-
-class EndToEndClusterIdTest extends KafkaServerTestHarness {
-
-  import EndToEndClusterIdTest._
-
-  val producerCount = 1
-  val consumerCount = 1
-  val serverCount = 1
-  lazy val producerConfig = new Properties
-  lazy val consumerConfig = new Properties
-  lazy val serverConfig = new Properties
-  val numRecords = 1
-  val topic = "e2etopic"
-  val part = 0
-  val tp = new TopicPartition(topic, part)
-  this.serverConfig.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
classOf[MockBrokerMetricsReporter].getName)
-
-  override def generateConfigs = {
-    val cfgs = TestUtils.createBrokerConfigs(serverCount, 
interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
-    cfgs.foreach(_ ++= serverConfig)
-    cfgs.map(KafkaConfig.fromProps)
-  }
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    MockDeserializer.resetStaticVariables()
-    // create the consumer offset topic
-    createTopic(topic, 2, serverCount)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testEndToEnd(groupProtocol: String): Unit = {
-    val appendStr = "mock"
-    MockConsumerInterceptor.resetCounters()
-    MockProducerInterceptor.resetCounters()
-
-    assertNotNull(MockBrokerMetricsReporter.CLUSTER_META)
-    isValidClusterId(MockBrokerMetricsReporter.CLUSTER_META.get.clusterId)
-
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
-    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
classOf[MockProducerInterceptor].getName)
-    producerProps.put("mock.interceptor.append", appendStr)
-    producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
classOf[MockProducerMetricsReporter].getName)
-    val testProducer = new KafkaProducer(producerProps, new MockSerializer, 
new MockSerializer)
-
-    // Send one record and make sure clusterId is set after send and before 
onAcknowledgement
-    sendRecords(testProducer, 1, tp)
-    
assertNotEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT, 
MockProducerInterceptor.NO_CLUSTER_ID)
-    assertNotNull(MockProducerInterceptor.CLUSTER_META)
-    
assertEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get.clusterId,
 MockProducerInterceptor.CLUSTER_META.get.clusterId)
-    isValidClusterId(MockProducerInterceptor.CLUSTER_META.get.clusterId)
-
-    // Make sure that serializer gets the cluster id before serialize method.
-    assertNotEquals(MockSerializer.CLUSTER_ID_BEFORE_SERIALIZE, 
MockSerializer.NO_CLUSTER_ID)
-    assertNotNull(MockSerializer.CLUSTER_META)
-    isValidClusterId(MockSerializer.CLUSTER_META.get.clusterId)
-
-    assertNotNull(MockProducerMetricsReporter.CLUSTER_META)
-    isValidClusterId(MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
-
-    this.consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
-    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
classOf[MockConsumerInterceptor].getName)
-    this.consumerConfig.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
classOf[MockConsumerMetricsReporter].getName)
-    this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
-    val testConsumer = new KafkaConsumer(this.consumerConfig, new 
MockDeserializer, new MockDeserializer)
-    testConsumer.assign(java.util.List.of(tp))
-    testConsumer.seek(tp, 0)
-
-    // consume and verify that values are modified by interceptors
-    consumeRecords(testConsumer, numRecords)
-
-    // Check that cluster id is present after the first poll call.
-    assertNotEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME, 
MockConsumerInterceptor.NO_CLUSTER_ID)
-    assertNotNull(MockConsumerInterceptor.CLUSTER_META)
-    isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get.clusterId)
-    
assertEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get.clusterId,
 MockConsumerInterceptor.CLUSTER_META.get.clusterId)
-
-    assertNotEquals(MockDeserializer.clusterIdBeforeDeserialize, 
MockDeserializer.noClusterId)
-    assertNotNull(MockDeserializer.clusterMeta)
-    isValidClusterId(MockDeserializer.clusterMeta.get.clusterId)
-    assertEquals(MockDeserializer.clusterIdBeforeDeserialize.get.clusterId, 
MockDeserializer.clusterMeta.get.clusterId)
-
-    assertNotNull(MockConsumerMetricsReporter.CLUSTER_META)
-    isValidClusterId(MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
-
-    // Make sure everyone receives the same cluster id.
-    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockSerializer.CLUSTER_META.get.clusterId)
-    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
-    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockConsumerInterceptor.CLUSTER_META.get.clusterId)
-    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockDeserializer.clusterMeta.get.clusterId)
-    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
-    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockBrokerMetricsReporter.CLUSTER_META.get.clusterId)
-
-    testConsumer.close()
-    testProducer.close()
-    MockConsumerInterceptor.resetCounters()
-    MockProducerInterceptor.resetCounters()
-  }
-
-  private def sendRecords(producer: KafkaProducer[String, String], numRecords: 
Int, tp: TopicPartition): Unit = {
-    val futures = (0 until numRecords).map { i =>
-      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i", s"$i")
-      debug(s"Sending this record: $record")
-      producer.send(record)
-    }
-    try {
-      futures.foreach(_.get)
-    } catch {
-      case e: ExecutionException => throw e.getCause
-    }
-  }
-
-  private def consumeRecords(consumer: Consumer[String, String],
-                             numRecords: Int,
-                             startingOffset: Int = 0,
-                             topic: String = topic,
-                             part: Int = part): Unit = {
-    val records = TestUtils.consumeRecords(consumer, numRecords)
-
-    for (i <- 0 until numRecords) {
-      val record = records(i)
-      val offset = startingOffset + i
-      assertEquals(topic, record.topic)
-      assertEquals(part, record.partition)
-      assertEquals(offset.toLong, record.offset)
-    }
-  }
-}
diff --git 
a/server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java 
b/server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java
new file mode 100644
index 00000000000..6c48f138a3f
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.api;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.MockDeserializer;
+import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockProducerInterceptor;
+import org.apache.kafka.test.MockSerializer;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.kafka.test.TestUtils.isValidClusterId;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** The test cases here verify the following conditions.
+ * 1. The ProducerInterceptor receives the cluster id after the onSend() 
method is called and before onAcknowledgement() method is called.
+ * 2. The Serializer receives the cluster id before the serialize() method is 
called.
+ * 3. The producer MetricReporter receives the cluster id after send() method 
is called on KafkaProducer.
+ * 4. The ConsumerInterceptor receives the cluster id before the onConsume() 
method.
+ * 5. The Deserializer receives the cluster id before the deserialize() method 
is called.
+ * 6. The consumer MetricReporter receives the cluster id after poll() is 
called on KafkaConsumer.
+ * 7. The broker MetricReporter receives the cluster id after the broker 
startup is over.
+ * 8. The broker KafkaMetricReporter receives the cluster id after the broker 
startup is over.
+ * 9. All the components receive the same cluster id.
+ */
+@ClusterTestDefaults(serverProperties = {
+    @ClusterConfigProperty(key = MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
value = "org.apache.kafka.api.EndToEndClusterIdTest$MockCommonMetricsReporter"),
+    @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = 
"1"),
+})
+public class EndToEndClusterIdTest {
+
+    private static final String TOPIC = "e2etopic";
+    private static final int PARTITION = 0;
+    private static final TopicPartition TP = new TopicPartition(TOPIC, 
PARTITION);
+    private final ClusterInstance clusterInstance;
+    private String clusterBrokerId;
+    private String controllerId;
+    private static final String PRODUCER_CLIENT_ID = "producerClientId";
+    private static final String CONSUMER_CLIENT_ID = "consumerClientId";
+
+    EndToEndClusterIdTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        this.clusterInstance.createTopic(TOPIC, 2, (short) 1);
+        clusterBrokerId = 
String.valueOf(clusterInstance.brokerIds().iterator().next());
+        controllerId = 
String.valueOf(clusterInstance.controllerIds().iterator().next());
+        MockDeserializer.resetStaticVariables();
+    }
+
+    public static class MockCommonMetricsReporter extends MockMetricsReporter 
implements ClusterResourceListener {
+        public static final Map<String, ClusterResource> CLUSTER_RESOURCE_MAP 
= new ConcurrentHashMap<>();
+        public String brokerId;
+        public String controllerId;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            super.configure(configs);
+
+            String roles = (String) configs.get("process.roles");
+            if (roles == null) return;
+
+            String id = (String) configs.get(ServerConfigs.BROKER_ID_CONFIG);
+            controllerId = roles.contains("controller") ? id : null;
+            brokerId    = roles.contains("broker")    ? id : null;
+        }
+
+        @Override
+        public void onUpdate(ClusterResource clusterMetadata) {
+            if (clientId != null) CLUSTER_RESOURCE_MAP.put(clientId, 
clusterMetadata);
+            if (brokerId != null) CLUSTER_RESOURCE_MAP.put(brokerId, 
clusterMetadata);
+            if (controllerId != null) CLUSTER_RESOURCE_MAP.put(controllerId, 
clusterMetadata);
+        }
+    }
+
+    @ClusterTest
+    public void testEndToEndWithClassicProtocol() throws Exception {
+        testEndToEnd(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testEndToEndWithConsumerProtocol() throws Exception {
+        testEndToEnd(GroupProtocol.CONSUMER);
+    }
+
+    public void testEndToEnd(GroupProtocol groupProtocol) throws Exception {
+        MockConsumerInterceptor.resetCounters();
+        MockProducerInterceptor.resetCounters();
+
+        ClusterResource brokerClusterResource = 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(clusterBrokerId);
+        assertNotNull(brokerClusterResource);
+        isValidClusterId(brokerClusterResource.clusterId());
+        ClusterResource controllerClusterResource = 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(controllerId);
+        assertNotNull(controllerClusterResource);
+        isValidClusterId(controllerClusterResource.clusterId());
+
+        Map<String, Object> producerConfig = 
Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName(),
+            "mock.interceptor.append", "mock",
+            ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockCommonMetricsReporter.class.getName(),
+            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
MockSerializer.class.getName(),
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
MockSerializer.class.getName(),
+            ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID);
+        try (var producer = clusterInstance.<String, 
String>producer(producerConfig)) {
+            // Send one record and make sure clusterId is set after sending 
and before onAcknowledgement
+            sendRecord(producer);
+        }
+        
assertNotEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get(),
 MockProducerInterceptor.NO_CLUSTER_ID);
+        assertNotNull(MockProducerInterceptor.CLUSTER_META.get());
+        assertEquals(
+            
MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get().clusterId(),
+            MockProducerInterceptor.CLUSTER_META.get().clusterId()
+        );
+        
isValidClusterId(MockProducerInterceptor.CLUSTER_META.get().clusterId());
+
+        // Make sure the serializer sees Cluster ID before serialize method
+        assertNotEquals(MockSerializer.CLUSTER_ID_BEFORE_SERIALIZE.get(), 
MockSerializer.NO_CLUSTER_ID);
+        assertNotNull(MockSerializer.CLUSTER_META.get());
+        isValidClusterId(MockSerializer.CLUSTER_META.get().clusterId());
+
+        ClusterResource producerClusterResource = 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(PRODUCER_CLIENT_ID);
+        assertNotNull(producerClusterResource);
+        isValidClusterId(producerClusterResource.clusterId());
+
+        Map<String, Object> consumerConfig = 
Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName(),
+            ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockCommonMetricsReporter.class.getName(),
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
MockDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
MockDeserializer.class.getName(),
+            ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name(),
+            ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID);
+        try (var consumer = clusterInstance.<String, 
String>consumer(consumerConfig)) {
+            consumer.assign(List.of(TP));
+            consumer.seek(TP, 0);
+            // consume and verify that values are modified by interceptors
+            consumeRecord(consumer);
+        }
+
+        // Check that cluster id is present after the first poll call.
+        
assertNotEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get(), 
MockConsumerInterceptor.NO_CLUSTER_ID);
+        assertNotNull(MockConsumerInterceptor.CLUSTER_META.get());
+        
isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get().clusterId());
+        assertEquals(
+            
MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get().clusterId(),
+            MockConsumerInterceptor.CLUSTER_META.get().clusterId()
+        );
+
+        assertNotEquals(MockDeserializer.clusterIdBeforeDeserialize.get(), 
MockDeserializer.noClusterId);
+        assertNotNull(MockDeserializer.clusterMeta);
+        isValidClusterId(MockDeserializer.clusterMeta.get().clusterId());
+        assertEquals(
+            MockDeserializer.clusterIdBeforeDeserialize.get().clusterId(),
+            MockDeserializer.clusterMeta.get().clusterId()
+        );
+
+        ClusterResource consumerClusterResource = 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(CONSUMER_CLIENT_ID);
+        assertNotNull(consumerClusterResource);
+        isValidClusterId(consumerClusterResource.clusterId());
+
+        // Make sure everyone receives the same cluster id.
+        String id = MockProducerInterceptor.CLUSTER_META.get().clusterId();
+        assertEquals(id, MockSerializer.CLUSTER_META.get().clusterId());
+        assertEquals(id, 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(PRODUCER_CLIENT_ID).clusterId());
+        assertEquals(id, 
MockConsumerInterceptor.CLUSTER_META.get().clusterId());
+        assertEquals(id, MockDeserializer.clusterMeta.get().clusterId());
+        assertEquals(id, 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(CONSUMER_CLIENT_ID).clusterId());
+        assertEquals(id, 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(clusterBrokerId).clusterId());
+        assertEquals(id, 
MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(controllerId).clusterId());
+
+        MockConsumerInterceptor.resetCounters();
+        MockProducerInterceptor.resetCounters();
+    }
+
+    private static void sendRecord(Producer<String, String> producer) throws 
Exception {
+        ProducerRecord<String, String> record = new 
ProducerRecord<>(TP.topic(), TP.partition(), "0", "0");
+        producer.send(record).get();
+    }
+
+    private void consumeRecord(Consumer<String, String> consumer) throws 
InterruptedException {
+        List<ConsumerRecord<String, String>> records = new ArrayList<>();
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100)).forEach(records::add);
+            return !records.isEmpty();
+        }, 60000, "Timed out before consuming expected record.");
+
+        ConsumerRecord<String, String> record = records.get(0);
+        assertEquals(TOPIC, record.topic());
+        assertEquals(PARTITION, record.partition());
+        assertEquals(0, record.offset());
+    }
+}
\ No newline at end of file

Reply via email to