This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b72b0abb1c MINOR: Optimize runtime of MM2 integration tests by
batching transactions (#13816)
3b72b0abb1c is described below
commit 3b72b0abb1c38828555a33096a7431616be7b8c4
Author: Greg Harris <[email protected]>
AuthorDate: Wed Jun 21 14:51:54 2023 -0400
MINOR: Optimize runtime of MM2 integration tests by batching transactions
(#13816)
Reviewers: Chris Egerton <[email protected]>
---
.../org/apache/kafka/connect/mirror/TestUtils.java | 42 ++++++--
.../MirrorConnectorsIntegrationBaseTest.java | 114 ++++++++++++---------
...irrorConnectorsIntegrationTransactionsTest.java | 36 ++++---
...rsWithCustomForwardingAdminIntegrationTest.java | 12 +--
4 files changed, 128 insertions(+), 76 deletions(-)
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
index ebe77fddf92..6c42972a107 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
@@ -16,7 +16,11 @@
*/
package org.apache.kafka.connect.mirror;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,14 +38,40 @@ public class TestUtils {
}
return props;
}
-
- /*
- * return records with different but predictable key and value
+
+ /**
+ * Assemble a collection of records arbitrarily distributed across all
partitions of the specified topic
+ * @param topicName Destination topic
+ * @param numRecords count of records to produce to the topic in total
+ * @return A batch of records that can be sent to a producer.
*/
- public static Map<String, String> generateRecords(int numRecords) {
- Map<String, String> records = new HashMap<>();
+ public static List<ProducerRecord<byte[], byte[]>> generateRecords(String
topicName, int numRecords) {
+ List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
- records.put("key-" + i, "message-" + i);
+ String key = "key-" + i;
+ String value = "message-" + i;
+ records.add(new ProducerRecord<>(topicName, null, key.getBytes(),
value.getBytes()));
+ }
+ return records;
+ }
+
+ /**
+ * Assemble a collection of records evenly distributed across some
partitions of the specified topic
+ * @param topicName Destination topic
+ * @param numRecords count of records to produce to each partition
+ * @param numPartitions number of partitions within the topic to write
records to.
+ * @return A batch of records that can be sent to a producer.
+ */
+ public static List<ProducerRecord<byte[], byte[]>> generateRecords(String
topicName, int numRecords, int numPartitions) {
+ List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ int cnt = 0;
+ for (int r = 0; r < numRecords; r++) {
+ for (int p = 0; p < numPartitions; p++) {
+ String value = "value-" + cnt;
+ String key = "key-" + cnt;
+ cnt++;
+ records.add(new ProducerRecord<>(topicName, p, key.getBytes(),
value.getBytes()));
+ }
}
return records;
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index be99d58b5d7..bc5e0212bbf 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -28,10 +28,16 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
import org.apache.kafka.connect.mirror.MirrorClient;
@@ -41,6 +47,7 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.TestUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -55,7 +62,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -75,8 +85,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
-
/**
* Tests MM2 replication and failover/failback logic.
*
@@ -93,6 +101,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected static final int NUM_PARTITIONS = 10;
protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS *
NUM_RECORDS_PER_PARTITION;
protected static final int OFFSET_LAG_MAX = 10;
+ protected static final int RECORD_PRODUCE_DURATION_MS = 20_000;
protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
protected static final int CHECKPOINT_DURATION_MS = 20_000;
private static final int RECORD_CONSUME_DURATION_MS = 20_000;
@@ -114,6 +123,8 @@ public class MirrorConnectorsIntegrationBaseTest {
protected MirrorMakerConfig mm2Config;
protected EmbeddedConnectCluster primary;
protected EmbeddedConnectCluster backup;
+ protected Producer<byte[], byte[]> primaryProducer;
+ protected Producer<byte[], byte[]> backupProducer;
protected Map<String, String> additionalPrimaryClusterClientsConfigs = new
HashMap<>();
protected Map<String, String> additionalBackupClusterClientsConfigs = new
HashMap<>();
@@ -212,6 +223,9 @@ public class MirrorConnectorsIntegrationBaseTest {
backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not
start in time.");
+ primaryProducer = initializeProducer(primary);
+ backupProducer = initializeProducer(backup);
+
createTopics();
waitForTopicCreated(backup, "mm2-status.primary.internal");
@@ -234,6 +248,8 @@ public class MirrorConnectorsIntegrationBaseTest {
@AfterEach
public void shutdownClusters() throws Exception {
try {
+ Utils.closeQuietly(primaryProducer, "primary producer");
+ Utils.closeQuietly(backupProducer, "backup producer");
for (String x : primary.connectors()) {
primary.deleteConnector(x);
}
@@ -259,10 +275,10 @@ public class MirrorConnectorsIntegrationBaseTest {
@Test
public void testReplication() throws Exception {
- produceMessages(primary, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
String backupTopic1 = remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS);
if (replicateBackupToPrimary) {
- produceMessages(backup, "test-topic-1");
+ produceMessages(backupProducer, "test-topic-1");
}
String reverseTopic1 = remoteTopicName("test-topic-1",
BACKUP_CLUSTER_ALIAS);
String consumerGroupName = "consumer-group-testReplication";
@@ -370,7 +386,7 @@ public class MirrorConnectorsIntegrationBaseTest {
waitForTopicCreated(backup, backupTopic2);
// only produce messages to the first partition
- produceMessages(primary, "test-topic-2", 1);
+ produceMessages(primaryProducer, "test-topic-2", 1);
// expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
assertEquals(NUM_RECORDS_PER_PARTITION,
primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS,
"test-topic-2").count(),
@@ -382,7 +398,7 @@ public class MirrorConnectorsIntegrationBaseTest {
backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
String reverseTopic3 = remoteTopicName("test-topic-3",
BACKUP_CLUSTER_ALIAS);
waitForTopicCreated(primary, reverseTopic3);
- produceMessages(backup, "test-topic-3", 1);
+ produceMessages(backupProducer, "test-topic-3", 1);
assertEquals(NUM_RECORDS_PER_PARTITION,
backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS,
"test-topic-3").count(),
"Records were not produced to backup cluster.");
@@ -401,8 +417,8 @@ public class MirrorConnectorsIntegrationBaseTest {
primary.kafka().createTopic(topic, NUM_PARTITIONS);
// produce to all test-topic-empty's partitions, except the last
partition
- produceMessages(primary, topic, NUM_PARTITIONS - 1);
-
+ produceMessages(primaryProducer, topic, NUM_PARTITIONS - 1);
+
// consume before starting the connectors, so we don't need to wait
for discovery
int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
@@ -449,7 +465,7 @@ public class MirrorConnectorsIntegrationBaseTest {
}
private void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws
InterruptedException {
- produceMessages(primary, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
String backupTopic1 = remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS);
String consumerGroupName =
"consumer-group-testOneWayReplicationWithAutoOffsetSync";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
@@ -497,7 +513,7 @@ public class MirrorConnectorsIntegrationBaseTest {
waitForTopicCreated(backup, remoteTopic2);
// produce some records to the new topic in primary cluster
- produceMessages(primary, "test-topic-2");
+ produceMessages(primaryProducer, "test-topic-2");
// create a consumer at primary cluster to consume the new topic
try (Consumer<byte[], byte[]> consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
@@ -535,7 +551,7 @@ public class MirrorConnectorsIntegrationBaseTest {
String consumerGroupName = "consumer-group-syncs-on-target";
Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
- produceMessages(primary, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
warmUpConsumer(consumerProps);
@@ -571,7 +587,7 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
// ensure there are some records in the topic on the source cluster
- produceMessages(primary, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
// warm up consumers before starting the connectors, so we don't need
to wait for discovery
warmUpConsumer(consumerProps);
@@ -609,7 +625,7 @@ public class MirrorConnectorsIntegrationBaseTest {
}, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to
backup cluster");
// Send some records to test-topic-no-checkpoints in the source cluster
- produceMessages(primary, "test-topic-no-checkpoints");
+ produceMessages(primaryProducer, "test-topic-no-checkpoints");
try (Consumer<byte[], byte[]> consumer =
primary.kafka().createConsumer(consumerProps)) {
Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
@@ -641,7 +657,7 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
- produceMessages(primary, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
}
@@ -649,7 +665,7 @@ public class MirrorConnectorsIntegrationBaseTest {
restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
Thread.sleep(5000);
- produceMessages(primary, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
}
@@ -668,19 +684,22 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+ // Produce and consume an initial batch of records to establish an
initial checkpoint
+ produceMessages(primaryProducer, "test-topic-1");
+ warmUpConsumer(consumerProps);
+ MirrorClient backupClient = new
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+ Map<TopicPartition, OffsetAndMetadata> initialCheckpoints =
waitForCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS,
remoteTopic);
// Produce a large number of records to the topic, all replicated
within one MM2 lifetime.
int iterations = 100;
- for (int i = 0; i < iterations; i++) {
- produceMessages(primary, "test-topic-1");
+ for (int i = 1; i < iterations; i++) {
+ produceMessages(primaryProducer, "test-topic-1");
}
waitForTopicCreated(backup, remoteTopic);
assertEquals(iterations * NUM_RECORDS_PRODUCED,
backup.kafka().consume(iterations * NUM_RECORDS_PRODUCED,
RECORD_TRANSFER_DURATION_MS, remoteTopic).count(),
"Records were not replicated to backup cluster.");
- // Once the replication has finished, we spin up the upstream consumer
and start slowly consuming records
+ // Once the replication has finished, we spin up the upstream consumer
again and start consuming records
ConsumerRecords<byte[], byte[]> allRecords =
primary.kafka().consume(iterations * NUM_RECORDS_PRODUCED,
RECORD_CONSUME_DURATION_MS, "test-topic-1");
- MirrorClient backupClient = new
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
- Map<TopicPartition, OffsetAndMetadata> initialCheckpoints =
waitForCheckpointOnAllPartitions(
- backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS,
remoteTopic);
Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
log.info("Initial checkpoints: {}", initialCheckpoints);
try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
@@ -958,38 +977,39 @@ public class MirrorConnectorsIntegrationBaseTest {
return allConfigs.get(configName).value();
}
}
-
- /*
- * produce messages to the cluster and topic
- */
- protected void produceMessages(EmbeddedConnectCluster cluster, String
topicName) {
- Map<String, String> recordSent = generateRecords(NUM_RECORDS_PRODUCED);
- for (Map.Entry<String, String> entry : recordSent.entrySet()) {
- produce(cluster.kafka(), topicName, null, entry.getKey(),
entry.getValue());
- }
+
+ protected void produceMessages(Producer<byte[], byte[]> producer, String
topicName) {
+ produceMessages(producer, TestUtils.generateRecords(topicName,
NUM_RECORDS_PRODUCED));
}
- /*
- * produce messages to the cluster and topic partition less than
numPartitions
- */
- private void produceMessages(EmbeddedConnectCluster cluster, String
topicName, int numPartitions) {
- int cnt = 0;
- for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
- for (int p = 0; p < numPartitions; p++)
- produce(cluster.kafka(), topicName, p, "key", "value-" +
cnt++);
+ protected void produceMessages(Producer<byte[], byte[]> producer, String
topicName, int numPartitions) {
+ produceMessages(producer, TestUtils.generateRecords(topicName,
NUM_RECORDS_PER_PARTITION, numPartitions));
+ }
+
+
+ protected Producer<byte[], byte[]>
initializeProducer(EmbeddedConnectCluster cluster) {
+ return cluster.kafka().createProducer(Collections.emptyMap());
}
/**
- * Produce a test record to a Kafka cluster.
- * This method allows subclasses to configure and use their own Kafka
Producer instead of using the built-in default.
- * @param cluster Kafka cluster that should receive the record
- * @param topic Topic to send the record to, non-null
- * @param partition Partition to send the record to, maybe null.
- * @param key Kafka key for the record
- * @param value Kafka value for the record
+ * Produce a batch of records with the specified producer
+ * @param producer Producer to use to send records
+ * @param records Records to send in one parallel batch
*/
- protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer
partition, String key, String value) {
- cluster.produce(topic, partition, key, value);
+ protected void produceMessages(Producer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> records) {
+ List<Future<RecordMetadata>> futures = new ArrayList<>();
+ for (ProducerRecord<byte[], byte[]> record : records) {
+ futures.add(producer.send(record));
+ }
+ Timer timer = Time.SYSTEM.timer(RECORD_PRODUCE_DURATION_MS);
+ try {
+ for (Future<RecordMetadata> future : futures) {
+ future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+ timer.update();
+ }
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
+ throw new RuntimeException(e);
+ }
}
private static Map<TopicPartition, OffsetAndMetadata>
waitForCheckpointOnAllPartitions(
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
index 6ac09ef32bb..c192d420375 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
@@ -16,17 +16,16 @@
*/
package org.apache.kafka.connect.mirror.integration;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.BeforeEach;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
/**
* Integration test for MirrorMaker2 in which source records are emitted with
a transactional producer,
@@ -34,8 +33,6 @@ import java.util.concurrent.TimeUnit;
*/
public class MirrorConnectorsIntegrationTransactionsTest extends
MirrorConnectorsIntegrationBaseTest {
- private Map<String, Object> producerProps = new HashMap<>();
-
@BeforeEach
@Override
public void startClusters() throws Exception {
@@ -43,24 +40,29 @@ public class MirrorConnectorsIntegrationTransactionsTest
extends MirrorConnector
backupBrokerProps.put("transaction.state.log.replication.factor", "1");
primaryBrokerProps.put("transaction.state.log.min.isr", "1");
backupBrokerProps.put("transaction.state.log.min.isr", "1");
+ super.startClusters();
+ }
+
+ @Override
+ protected Producer<byte[], byte[]>
initializeProducer(EmbeddedConnectCluster cluster) {
+ Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"embedded-kafka-0");
- super.startClusters();
+ KafkaProducer<byte[], byte[]> producer =
cluster.kafka().createProducer(producerProps);
+ producer.initTransactions();
+ return producer;
}
- /**
- * Produce records with a short-lived transactional producer to interleave
transaction markers in the topic.
- */
+
@Override
- protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer
partition, String key, String value) {
- ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic,
partition, key == null ? null : key.getBytes(), value == null ? null :
value.getBytes());
- try (Producer<byte[], byte[]> producer =
cluster.createProducer(producerProps)) {
- producer.initTransactions();
+ protected void produceMessages(Producer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> records) {
+ try {
producer.beginTransaction();
- producer.send(msg).get(120000, TimeUnit.MILLISECONDS);
+ super.produceMessages(producer, records);
producer.commitTransaction();
- } catch (Exception e) {
- throw new KafkaException("Could not produce message: " + msg, e);
+ } catch (RuntimeException e) {
+ producer.abortTransaction();
+ throw e;
}
}
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
index ab149e68753..281bb88456e 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
@@ -187,8 +187,8 @@ public class
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
@Test
public void testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
throws Exception {
- produceMessages(primary, "test-topic-1");
- produceMessages(backup, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
+ produceMessages(backupProducer, "test-topic-1");
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need
to wait for discovery
@@ -224,8 +224,8 @@ public class
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
@Test
public void testCreatePartitionsUseProvidedForwardingAdmin() throws
Exception {
mm2Config = new MirrorMakerConfig(mm2Props);
- produceMessages(backup, "test-topic-1");
- produceMessages(primary, "test-topic-1");
+ produceMessages(backupProducer, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need
to wait for discovery
@@ -257,8 +257,8 @@ public class
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
public void testSyncTopicConfigUseProvidedForwardingAdmin() throws
Exception {
mm2Props.put("sync.topic.configs.enabled", "true");
mm2Config = new MirrorMakerConfig(mm2Props);
- produceMessages(backup, "test-topic-1");
- produceMessages(primary, "test-topic-1");
+ produceMessages(backupProducer, "test-topic-1");
+ produceMessages(primaryProducer, "test-topic-1");
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need
to wait for discovery