This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new aafc1ae27e0 KAFKA-19056 Rewrite EndToEndClusterIdTest in Java and move it to the server module (#19741) aafc1ae27e0 is described below commit aafc1ae27e07cc1891db970871de296836094692 Author: Ming-Yen Chung <mingyen...@gmail.com> AuthorDate: Thu May 29 19:08:05 2025 +0800 KAFKA-19056 Rewrite EndToEndClusterIdTest in Java and move it to the server module (#19741) Use ClusterTest and java to rewrite `EndToEndClusterIdTest` and move it to the server module Reviewers: Ken Huang <s7133...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/api/EndToEndClusterIdTest.scala | 217 ------------------- .../apache/kafka/api/EndToEndClusterIdTest.java | 231 +++++++++++++++++++++ 2 files changed, 231 insertions(+), 217 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala deleted file mode 100644 index 3b49a7a196b..00000000000 --- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.util.concurrent.ExecutionException -import java.util.concurrent.atomic.AtomicReference -import java.util.Properties -import kafka.integration.KafkaServerTestHarness -import kafka.server._ -import kafka.utils._ -import kafka.utils.Implicits._ -import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition} -import org.apache.kafka.server.metrics.MetricConfigs -import org.apache.kafka.test.{TestUtils => _, _} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{BeforeEach, TestInfo} - -import org.apache.kafka.test.TestUtils.isValidClusterId -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -/** The test cases here verify the following conditions. - * 1. The ProducerInterceptor receives the cluster id after the onSend() method is called and before onAcknowledgement() method is called. - * 2. The Serializer receives the cluster id before the serialize() method is called. - * 3. The producer MetricReporter receives the cluster id after send() method is called on KafkaProducer. - * 4. The ConsumerInterceptor receives the cluster id before the onConsume() method. - * 5. The Deserializer receives the cluster id before the deserialize() method is called. - * 6. The consumer MetricReporter receives the cluster id after poll() is called on KafkaConsumer. - * 7. The broker MetricReporter receives the cluster id after the broker startup is over. - * 8. The broker KafkaMetricReporter receives the cluster id after the broker startup is over. - * 9. All the components receive the same cluster id. - */ - -object EndToEndClusterIdTest { - - object MockConsumerMetricsReporter { - val CLUSTER_META = new AtomicReference[ClusterResource] - } - - class MockConsumerMetricsReporter extends MockMetricsReporter with ClusterResourceListener { - - override def onUpdate(clusterMetadata: ClusterResource): Unit = { - MockConsumerMetricsReporter.CLUSTER_META.set(clusterMetadata) - } - } - - object MockProducerMetricsReporter { - val CLUSTER_META = new AtomicReference[ClusterResource] - } - - class MockProducerMetricsReporter extends MockMetricsReporter with ClusterResourceListener { - - override def onUpdate(clusterMetadata: ClusterResource): Unit = { - MockProducerMetricsReporter.CLUSTER_META.set(clusterMetadata) - } - } - - object MockBrokerMetricsReporter { - val CLUSTER_META = new AtomicReference[ClusterResource] - } - - class MockBrokerMetricsReporter extends MockMetricsReporter with ClusterResourceListener { - - override def onUpdate(clusterMetadata: ClusterResource): Unit = { - MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata) - } - } -} - -class EndToEndClusterIdTest extends KafkaServerTestHarness { - - import EndToEndClusterIdTest._ - - val producerCount = 1 - val consumerCount = 1 - val serverCount = 1 - lazy val producerConfig = new Properties - lazy val consumerConfig = new Properties - lazy val serverConfig = new Properties - val numRecords = 1 - val topic = "e2etopic" - val part = 0 - val tp = new TopicPartition(topic, part) - this.serverConfig.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockBrokerMetricsReporter].getName) - - override def generateConfigs = { - val cfgs = TestUtils.createBrokerConfigs(serverCount, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) - cfgs.foreach(_ ++= serverConfig) - cfgs.map(KafkaConfig.fromProps) - } - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - super.setUp(testInfo) - MockDeserializer.resetStaticVariables() - // create the consumer offset topic - createTopic(topic, 2, serverCount) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testEndToEnd(groupProtocol: String): Unit = { - val appendStr = "mock" - MockConsumerInterceptor.resetCounters() - MockProducerInterceptor.resetCounters() - - assertNotNull(MockBrokerMetricsReporter.CLUSTER_META) - isValidClusterId(MockBrokerMetricsReporter.CLUSTER_META.get.clusterId) - - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classOf[MockProducerInterceptor].getName) - producerProps.put("mock.interceptor.append", appendStr) - producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockProducerMetricsReporter].getName) - val testProducer = new KafkaProducer(producerProps, new MockSerializer, new MockSerializer) - - // Send one record and make sure clusterId is set after send and before onAcknowledgement - sendRecords(testProducer, 1, tp) - assertNotEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT, MockProducerInterceptor.NO_CLUSTER_ID) - assertNotNull(MockProducerInterceptor.CLUSTER_META) - assertEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get.clusterId, MockProducerInterceptor.CLUSTER_META.get.clusterId) - isValidClusterId(MockProducerInterceptor.CLUSTER_META.get.clusterId) - - // Make sure that serializer gets the cluster id before serialize method. - assertNotEquals(MockSerializer.CLUSTER_ID_BEFORE_SERIALIZE, MockSerializer.NO_CLUSTER_ID) - assertNotNull(MockSerializer.CLUSTER_META) - isValidClusterId(MockSerializer.CLUSTER_META.get.clusterId) - - assertNotNull(MockProducerMetricsReporter.CLUSTER_META) - isValidClusterId(MockProducerMetricsReporter.CLUSTER_META.get.clusterId) - - this.consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, classOf[MockConsumerInterceptor].getName) - this.consumerConfig.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockConsumerMetricsReporter].getName) - this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) - val testConsumer = new KafkaConsumer(this.consumerConfig, new MockDeserializer, new MockDeserializer) - testConsumer.assign(java.util.List.of(tp)) - testConsumer.seek(tp, 0) - - // consume and verify that values are modified by interceptors - consumeRecords(testConsumer, numRecords) - - // Check that cluster id is present after the first poll call. - assertNotEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME, MockConsumerInterceptor.NO_CLUSTER_ID) - assertNotNull(MockConsumerInterceptor.CLUSTER_META) - isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get.clusterId) - assertEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId) - - assertNotEquals(MockDeserializer.clusterIdBeforeDeserialize, MockDeserializer.noClusterId) - assertNotNull(MockDeserializer.clusterMeta) - isValidClusterId(MockDeserializer.clusterMeta.get.clusterId) - assertEquals(MockDeserializer.clusterIdBeforeDeserialize.get.clusterId, MockDeserializer.clusterMeta.get.clusterId) - - assertNotNull(MockConsumerMetricsReporter.CLUSTER_META) - isValidClusterId(MockConsumerMetricsReporter.CLUSTER_META.get.clusterId) - - // Make sure everyone receives the same cluster id. - assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockSerializer.CLUSTER_META.get.clusterId) - assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockProducerMetricsReporter.CLUSTER_META.get.clusterId) - assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId) - assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockDeserializer.clusterMeta.get.clusterId) - assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerMetricsReporter.CLUSTER_META.get.clusterId) - assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockBrokerMetricsReporter.CLUSTER_META.get.clusterId) - - testConsumer.close() - testProducer.close() - MockConsumerInterceptor.resetCounters() - MockProducerInterceptor.resetCounters() - } - - private def sendRecords(producer: KafkaProducer[String, String], numRecords: Int, tp: TopicPartition): Unit = { - val futures = (0 until numRecords).map { i => - val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i", s"$i") - debug(s"Sending this record: $record") - producer.send(record) - } - try { - futures.foreach(_.get) - } catch { - case e: ExecutionException => throw e.getCause - } - } - - private def consumeRecords(consumer: Consumer[String, String], - numRecords: Int, - startingOffset: Int = 0, - topic: String = topic, - part: Int = part): Unit = { - val records = TestUtils.consumeRecords(consumer, numRecords) - - for (i <- 0 until numRecords) { - val record = records(i) - val offset = startingOffset + i - assertEquals(topic, record.topic) - assertEquals(part, record.partition) - assertEquals(offset.toLong, record.offset) - } - } -} diff --git a/server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java b/server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java new file mode 100644 index 00000000000..6c48f138a3f --- /dev/null +++ b/server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.api; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.MockDeserializer; +import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.MockProducerInterceptor; +import org.apache.kafka.test.MockSerializer; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.BeforeEach; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.kafka.test.TestUtils.isValidClusterId; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** The test cases here verify the following conditions. + * 1. The ProducerInterceptor receives the cluster id after the onSend() method is called and before onAcknowledgement() method is called. + * 2. The Serializer receives the cluster id before the serialize() method is called. + * 3. The producer MetricReporter receives the cluster id after send() method is called on KafkaProducer. + * 4. The ConsumerInterceptor receives the cluster id before the onConsume() method. + * 5. The Deserializer receives the cluster id before the deserialize() method is called. + * 6. The consumer MetricReporter receives the cluster id after poll() is called on KafkaConsumer. + * 7. The broker MetricReporter receives the cluster id after the broker startup is over. + * 8. The broker KafkaMetricReporter receives the cluster id after the broker startup is over. + * 9. All the components receive the same cluster id. + */ +@ClusterTestDefaults(serverProperties = { + @ClusterConfigProperty(key = MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.api.EndToEndClusterIdTest$MockCommonMetricsReporter"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class EndToEndClusterIdTest { + + private static final String TOPIC = "e2etopic"; + private static final int PARTITION = 0; + private static final TopicPartition TP = new TopicPartition(TOPIC, PARTITION); + private final ClusterInstance clusterInstance; + private String clusterBrokerId; + private String controllerId; + private static final String PRODUCER_CLIENT_ID = "producerClientId"; + private static final String CONSUMER_CLIENT_ID = "consumerClientId"; + + EndToEndClusterIdTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + this.clusterInstance.createTopic(TOPIC, 2, (short) 1); + clusterBrokerId = String.valueOf(clusterInstance.brokerIds().iterator().next()); + controllerId = String.valueOf(clusterInstance.controllerIds().iterator().next()); + MockDeserializer.resetStaticVariables(); + } + + public static class MockCommonMetricsReporter extends MockMetricsReporter implements ClusterResourceListener { + public static final Map<String, ClusterResource> CLUSTER_RESOURCE_MAP = new ConcurrentHashMap<>(); + public String brokerId; + public String controllerId; + + @Override + public void configure(Map<String, ?> configs) { + super.configure(configs); + + String roles = (String) configs.get("process.roles"); + if (roles == null) return; + + String id = (String) configs.get(ServerConfigs.BROKER_ID_CONFIG); + controllerId = roles.contains("controller") ? id : null; + brokerId = roles.contains("broker") ? id : null; + } + + @Override + public void onUpdate(ClusterResource clusterMetadata) { + if (clientId != null) CLUSTER_RESOURCE_MAP.put(clientId, clusterMetadata); + if (brokerId != null) CLUSTER_RESOURCE_MAP.put(brokerId, clusterMetadata); + if (controllerId != null) CLUSTER_RESOURCE_MAP.put(controllerId, clusterMetadata); + } + } + + @ClusterTest + public void testEndToEndWithClassicProtocol() throws Exception { + testEndToEnd(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testEndToEndWithConsumerProtocol() throws Exception { + testEndToEnd(GroupProtocol.CONSUMER); + } + + public void testEndToEnd(GroupProtocol groupProtocol) throws Exception { + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + + ClusterResource brokerClusterResource = MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(clusterBrokerId); + assertNotNull(brokerClusterResource); + isValidClusterId(brokerClusterResource.clusterId()); + ClusterResource controllerClusterResource = MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(controllerId); + assertNotNull(controllerClusterResource); + isValidClusterId(controllerClusterResource.clusterId()); + + Map<String, Object> producerConfig = Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + "mock.interceptor.append", "mock", + ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockCommonMetricsReporter.class.getName(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MockSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MockSerializer.class.getName(), + ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID); + try (var producer = clusterInstance.<String, String>producer(producerConfig)) { + // Send one record and make sure clusterId is set after sending and before onAcknowledgement + sendRecord(producer); + } + assertNotEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get(), MockProducerInterceptor.NO_CLUSTER_ID); + assertNotNull(MockProducerInterceptor.CLUSTER_META.get()); + assertEquals( + MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get().clusterId(), + MockProducerInterceptor.CLUSTER_META.get().clusterId() + ); + isValidClusterId(MockProducerInterceptor.CLUSTER_META.get().clusterId()); + + // Make sure the serializer sees Cluster ID before serialize method + assertNotEquals(MockSerializer.CLUSTER_ID_BEFORE_SERIALIZE.get(), MockSerializer.NO_CLUSTER_ID); + assertNotNull(MockSerializer.CLUSTER_META.get()); + isValidClusterId(MockSerializer.CLUSTER_META.get().clusterId()); + + ClusterResource producerClusterResource = MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(PRODUCER_CLIENT_ID); + assertNotNull(producerClusterResource); + isValidClusterId(producerClusterResource.clusterId()); + + Map<String, Object> consumerConfig = Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName(), + ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockCommonMetricsReporter.class.getName(), + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, MockDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MockDeserializer.class.getName(), + ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name(), + ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID); + try (var consumer = clusterInstance.<String, String>consumer(consumerConfig)) { + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + // consume and verify that values are modified by interceptors + consumeRecord(consumer); + } + + // Check that cluster id is present after the first poll call. + assertNotEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get(), MockConsumerInterceptor.NO_CLUSTER_ID); + assertNotNull(MockConsumerInterceptor.CLUSTER_META.get()); + isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get().clusterId()); + assertEquals( + MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get().clusterId(), + MockConsumerInterceptor.CLUSTER_META.get().clusterId() + ); + + assertNotEquals(MockDeserializer.clusterIdBeforeDeserialize.get(), MockDeserializer.noClusterId); + assertNotNull(MockDeserializer.clusterMeta); + isValidClusterId(MockDeserializer.clusterMeta.get().clusterId()); + assertEquals( + MockDeserializer.clusterIdBeforeDeserialize.get().clusterId(), + MockDeserializer.clusterMeta.get().clusterId() + ); + + ClusterResource consumerClusterResource = MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(CONSUMER_CLIENT_ID); + assertNotNull(consumerClusterResource); + isValidClusterId(consumerClusterResource.clusterId()); + + // Make sure everyone receives the same cluster id. + String id = MockProducerInterceptor.CLUSTER_META.get().clusterId(); + assertEquals(id, MockSerializer.CLUSTER_META.get().clusterId()); + assertEquals(id, MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(PRODUCER_CLIENT_ID).clusterId()); + assertEquals(id, MockConsumerInterceptor.CLUSTER_META.get().clusterId()); + assertEquals(id, MockDeserializer.clusterMeta.get().clusterId()); + assertEquals(id, MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(CONSUMER_CLIENT_ID).clusterId()); + assertEquals(id, MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(clusterBrokerId).clusterId()); + assertEquals(id, MockCommonMetricsReporter.CLUSTER_RESOURCE_MAP.get(controllerId).clusterId()); + + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + } + + private static void sendRecord(Producer<String, String> producer) throws Exception { + ProducerRecord<String, String> record = new ProducerRecord<>(TP.topic(), TP.partition(), "0", "0"); + producer.send(record).get(); + } + + private void consumeRecord(Consumer<String, String> consumer) throws InterruptedException { + List<ConsumerRecord<String, String>> records = new ArrayList<>(); + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100)).forEach(records::add); + return !records.isEmpty(); + }, 60000, "Timed out before consuming expected record."); + + ConsumerRecord<String, String> record = records.get(0); + assertEquals(TOPIC, record.topic()); + assertEquals(PARTITION, record.partition()); + assertEquals(0, record.offset()); + } +} \ No newline at end of file