This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b72b0abb1c MINOR: Optimize runtime of MM2 integration tests by 
batching transactions (#13816)
3b72b0abb1c is described below

commit 3b72b0abb1c38828555a33096a7431616be7b8c4
Author: Greg Harris <[email protected]>
AuthorDate: Wed Jun 21 14:51:54 2023 -0400

    MINOR: Optimize runtime of MM2 integration tests by batching transactions 
(#13816)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../org/apache/kafka/connect/mirror/TestUtils.java |  42 ++++++--
 .../MirrorConnectorsIntegrationBaseTest.java       | 114 ++++++++++++---------
 ...irrorConnectorsIntegrationTransactionsTest.java |  36 ++++---
 ...rsWithCustomForwardingAdminIntegrationTest.java |  12 +--
 4 files changed, 128 insertions(+), 76 deletions(-)

diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
index ebe77fddf92..6c42972a107 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java
@@ -16,7 +16,11 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,14 +38,40 @@ public class TestUtils {
         }
         return props;
     }
-    
-    /*
-     * return records with different but predictable key and value 
+
+    /**
+     * Assemble a collection of records arbitrarily distributed across all 
partitions of the specified topic
+     * @param topicName Destination topic
+     * @param numRecords count of records to produce to the topic in total
+     * @return A batch of records that can be sent to a producer.
      */
-    public static Map<String, String> generateRecords(int numRecords) {
-        Map<String, String> records = new HashMap<>();
+    public static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topicName, int numRecords) {
+        List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
         for (int i = 0; i < numRecords; i++) {
-            records.put("key-" + i, "message-" + i);
+            String key = "key-" + i;
+            String value = "message-" + i;
+            records.add(new ProducerRecord<>(topicName, null, key.getBytes(), 
value.getBytes()));
+        }
+        return records;
+    }
+
+    /**
+     * Assemble a collection of records evenly distributed across some 
partitions of the specified topic
+     * @param topicName Destination topic
+     * @param numRecords count of records to produce to each partition
+     * @param numPartitions number of partitions within the topic to write 
records to.
+     * @return A batch of records that can be sent to a producer.
+     */
+    public static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topicName, int numRecords, int numPartitions) {
+        List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+        int cnt = 0;
+        for (int r = 0; r < numRecords; r++) {
+            for (int p = 0; p < numPartitions; p++) {
+                String value = "value-" + cnt;
+                String key = "key-" + cnt;
+                cnt++;
+                records.add(new ProducerRecord<>(topicName, p, key.getBytes(), 
value.getBytes()));
+            }
         }
         return records;
     }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index be99d58b5d7..bc5e0212bbf 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -28,10 +28,16 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
 import org.apache.kafka.connect.mirror.MirrorClient;
@@ -41,6 +47,7 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
 import org.apache.kafka.connect.mirror.SourceAndTarget;
 import org.apache.kafka.connect.mirror.Checkpoint;
 import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.TestUtils;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -55,7 +62,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -75,8 +85,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
-import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
-
 /**
  * Tests MM2 replication and failover/failback logic.
  *
@@ -93,6 +101,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected static final int NUM_PARTITIONS = 10;
     protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
     protected static final int OFFSET_LAG_MAX = 10;
+    protected static final int RECORD_PRODUCE_DURATION_MS = 20_000;
     protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
     protected static final int CHECKPOINT_DURATION_MS = 20_000;
     private static final int RECORD_CONSUME_DURATION_MS = 20_000;
@@ -114,6 +123,8 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected MirrorMakerConfig mm2Config;
     protected EmbeddedConnectCluster primary;
     protected EmbeddedConnectCluster backup;
+    protected Producer<byte[], byte[]> primaryProducer;
+    protected Producer<byte[], byte[]> backupProducer;
 
     protected Map<String, String> additionalPrimaryClusterClientsConfigs = new 
HashMap<>();
     protected Map<String, String> additionalBackupClusterClientsConfigs = new 
HashMap<>();
@@ -212,6 +223,9 @@ public class MirrorConnectorsIntegrationBaseTest {
         backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
             "Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not 
start in time.");
 
+        primaryProducer = initializeProducer(primary);
+        backupProducer = initializeProducer(backup);
+
         createTopics();
 
         waitForTopicCreated(backup, "mm2-status.primary.internal");
@@ -234,6 +248,8 @@ public class MirrorConnectorsIntegrationBaseTest {
     @AfterEach
     public void shutdownClusters() throws Exception {
         try {
+            Utils.closeQuietly(primaryProducer, "primary producer");
+            Utils.closeQuietly(backupProducer, "backup producer");
             for (String x : primary.connectors()) {
                 primary.deleteConnector(x);
             }
@@ -259,10 +275,10 @@ public class MirrorConnectorsIntegrationBaseTest {
     
     @Test
     public void testReplication() throws Exception {
-        produceMessages(primary, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
         String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
         if (replicateBackupToPrimary) {
-            produceMessages(backup, "test-topic-1");
+            produceMessages(backupProducer, "test-topic-1");
         }
         String reverseTopic1 = remoteTopicName("test-topic-1", 
BACKUP_CLUSTER_ALIAS);
         String consumerGroupName = "consumer-group-testReplication";
@@ -370,7 +386,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         waitForTopicCreated(backup, backupTopic2);
 
         // only produce messages to the first partition
-        produceMessages(primary, "test-topic-2", 1);
+        produceMessages(primaryProducer, "test-topic-2", 1);
 
         // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
         assertEquals(NUM_RECORDS_PER_PARTITION, 
primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, 
"test-topic-2").count(),
@@ -382,7 +398,7 @@ public class MirrorConnectorsIntegrationBaseTest {
             backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
             String reverseTopic3 = remoteTopicName("test-topic-3", 
BACKUP_CLUSTER_ALIAS);
             waitForTopicCreated(primary, reverseTopic3);
-            produceMessages(backup, "test-topic-3", 1);
+            produceMessages(backupProducer, "test-topic-3", 1);
             assertEquals(NUM_RECORDS_PER_PARTITION, 
backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, 
"test-topic-3").count(),
                     "Records were not produced to backup cluster.");
 
@@ -401,8 +417,8 @@ public class MirrorConnectorsIntegrationBaseTest {
         primary.kafka().createTopic(topic, NUM_PARTITIONS);
 
         // produce to all test-topic-empty's partitions, except the last 
partition
-        produceMessages(primary, topic, NUM_PARTITIONS - 1);
-        
+        produceMessages(primaryProducer, topic, NUM_PARTITIONS - 1);
+
         // consume before starting the connectors, so we don't need to wait 
for discovery
         int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
         try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
@@ -449,7 +465,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     }
 
     private void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws 
InterruptedException {
-        produceMessages(primary, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
         String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
         String consumerGroupName = 
"consumer-group-testOneWayReplicationWithAutoOffsetSync";
         Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
@@ -497,7 +513,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         waitForTopicCreated(backup, remoteTopic2);
 
         // produce some records to the new topic in primary cluster
-        produceMessages(primary, "test-topic-2");
+        produceMessages(primaryProducer, "test-topic-2");
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
@@ -535,7 +551,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         String consumerGroupName = "consumer-group-syncs-on-target";
         Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
 
-        produceMessages(primary, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
 
         warmUpConsumer(consumerProps);
 
@@ -571,7 +587,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
 
         // ensure there are some records in the topic on the source cluster
-        produceMessages(primary, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
 
         // warm up consumers before starting the connectors, so we don't need 
to wait for discovery
         warmUpConsumer(consumerProps);
@@ -609,7 +625,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to 
backup cluster");
 
         // Send some records to test-topic-no-checkpoints in the source cluster
-        produceMessages(primary, "test-topic-no-checkpoints");
+        produceMessages(primaryProducer, "test-topic-no-checkpoints");
 
         try (Consumer<byte[], byte[]> consumer = 
primary.kafka().createConsumer(consumerProps)) {
             Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
@@ -641,7 +657,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
         mm2Config = new MirrorMakerConfig(mm2Props);
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-        produceMessages(primary, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
         try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
             waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
         }
@@ -649,7 +665,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
         Thread.sleep(5000);
-        produceMessages(primary, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
         try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
             waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
         }
@@ -668,19 +684,22 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
         mm2Config = new MirrorMakerConfig(mm2Props);
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        // Produce and consume an initial batch of records to establish an 
initial checkpoint
+        produceMessages(primaryProducer, "test-topic-1");
+        warmUpConsumer(consumerProps);
+        MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+        Map<TopicPartition, OffsetAndMetadata> initialCheckpoints = 
waitForCheckpointOnAllPartitions(
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
remoteTopic);
         // Produce a large number of records to the topic, all replicated 
within one MM2 lifetime.
         int iterations = 100;
-        for (int i = 0; i < iterations; i++) {
-            produceMessages(primary, "test-topic-1");
+        for (int i = 1; i < iterations; i++) {
+            produceMessages(primaryProducer, "test-topic-1");
         }
         waitForTopicCreated(backup, remoteTopic);
         assertEquals(iterations * NUM_RECORDS_PRODUCED, 
backup.kafka().consume(iterations * NUM_RECORDS_PRODUCED, 
RECORD_TRANSFER_DURATION_MS, remoteTopic).count(),
                 "Records were not replicated to backup cluster.");
-        // Once the replication has finished, we spin up the upstream consumer 
and start slowly consuming records
+        // Once the replication has finished, we spin up the upstream consumer 
again and start consuming records
         ConsumerRecords<byte[], byte[]> allRecords = 
primary.kafka().consume(iterations * NUM_RECORDS_PRODUCED, 
RECORD_CONSUME_DURATION_MS, "test-topic-1");
-        MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
-        Map<TopicPartition, OffsetAndMetadata> initialCheckpoints = 
waitForCheckpointOnAllPartitions(
-                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
remoteTopic);
         Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
         log.info("Initial checkpoints: {}", initialCheckpoints);
         try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
@@ -958,38 +977,39 @@ public class MirrorConnectorsIntegrationBaseTest {
             return allConfigs.get(configName).value();
         }
     }
-    
-    /*
-     *  produce messages to the cluster and topic 
-     */
-    protected void produceMessages(EmbeddedConnectCluster cluster, String 
topicName) {
-        Map<String, String> recordSent = generateRecords(NUM_RECORDS_PRODUCED);
-        for (Map.Entry<String, String> entry : recordSent.entrySet()) {
-            produce(cluster.kafka(), topicName, null, entry.getKey(), 
entry.getValue());
-        }
+
+    protected void produceMessages(Producer<byte[], byte[]> producer, String 
topicName) {
+        produceMessages(producer, TestUtils.generateRecords(topicName, 
NUM_RECORDS_PRODUCED));
     }
 
-    /*
-     * produce messages to the cluster and topic partition less than 
numPartitions 
-     */
-    private void produceMessages(EmbeddedConnectCluster cluster, String 
topicName, int numPartitions) {
-        int cnt = 0;
-        for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
-            for (int p = 0; p < numPartitions; p++)
-                produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
+    protected void produceMessages(Producer<byte[], byte[]> producer, String 
topicName, int numPartitions) {
+        produceMessages(producer, TestUtils.generateRecords(topicName, 
NUM_RECORDS_PER_PARTITION, numPartitions));
+    }
+
+
+    protected Producer<byte[], byte[]> 
initializeProducer(EmbeddedConnectCluster cluster) {
+        return cluster.kafka().createProducer(Collections.emptyMap());
     }
 
     /**
-     * Produce a test record to a Kafka cluster.
-     * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
-     * @param cluster   Kafka cluster that should receive the record
-     * @param topic     Topic to send the record to, non-null
-     * @param partition Partition to send the record to, maybe null.
-     * @param key       Kafka key for the record
-     * @param value     Kafka value for the record
+     * Produce a batch of records with the specified producer
+     * @param producer Producer to use to send records
+     * @param records Records to send in one parallel batch
      */
-    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
-        cluster.produce(topic, partition, key, value);
+    protected void produceMessages(Producer<byte[], byte[]> producer, 
List<ProducerRecord<byte[], byte[]>> records) {
+        List<Future<RecordMetadata>> futures = new ArrayList<>();
+        for (ProducerRecord<byte[], byte[]> record : records) {
+            futures.add(producer.send(record));
+        }
+        Timer timer = Time.SYSTEM.timer(RECORD_PRODUCE_DURATION_MS);
+        try {
+            for (Future<RecordMetadata> future : futures) {
+                future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+                timer.update();
+            }
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            throw new RuntimeException(e);
+        }
     }
 
     private static Map<TopicPartition, OffsetAndMetadata> 
waitForCheckpointOnAllPartitions(
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
index 6ac09ef32bb..c192d420375 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
@@ -16,17 +16,16 @@
  */
 package org.apache.kafka.connect.mirror.integration;
 
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.junit.jupiter.api.BeforeEach;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 
 /**
  * Integration test for MirrorMaker2 in which source records are emitted with 
a transactional producer,
@@ -34,8 +33,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class MirrorConnectorsIntegrationTransactionsTest extends 
MirrorConnectorsIntegrationBaseTest {
 
-    private Map<String, Object> producerProps = new HashMap<>();
-
     @BeforeEach
     @Override
     public void startClusters() throws Exception {
@@ -43,24 +40,29 @@ public class MirrorConnectorsIntegrationTransactionsTest 
extends MirrorConnector
         backupBrokerProps.put("transaction.state.log.replication.factor", "1");
         primaryBrokerProps.put("transaction.state.log.min.isr", "1");
         backupBrokerProps.put("transaction.state.log.min.isr", "1");
+        super.startClusters();
+    }
+
+    @Override
+    protected Producer<byte[], byte[]> 
initializeProducer(EmbeddedConnectCluster cluster) {
+        Map<String, Object> producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
         producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"embedded-kafka-0");
-        super.startClusters();
+        KafkaProducer<byte[], byte[]> producer = 
cluster.kafka().createProducer(producerProps);
+        producer.initTransactions();
+        return producer;
     }
 
-    /**
-     * Produce records with a short-lived transactional producer to interleave 
transaction markers in the topic.
-     */
+
     @Override
-    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
-        ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, 
partition, key == null ? null : key.getBytes(), value == null ? null : 
value.getBytes());
-        try (Producer<byte[], byte[]> producer = 
cluster.createProducer(producerProps)) {
-            producer.initTransactions();
+    protected void produceMessages(Producer<byte[], byte[]> producer, 
List<ProducerRecord<byte[], byte[]>> records) {
+        try {
             producer.beginTransaction();
-            producer.send(msg).get(120000, TimeUnit.MILLISECONDS);
+            super.produceMessages(producer, records);
             producer.commitTransaction();
-        } catch (Exception e) {
-            throw new KafkaException("Could not produce message: " + msg, e);
+        } catch (RuntimeException e) {
+            producer.abortTransaction();
+            throw e;
         }
     }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
index ab149e68753..281bb88456e 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
@@ -187,8 +187,8 @@ public class 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
 
     @Test
     public void testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() 
throws Exception {
-        produceMessages(primary, "test-topic-1");
-        produceMessages(backup, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
+        produceMessages(backupProducer, "test-topic-1");
         String consumerGroupName = "consumer-group-testReplication";
         Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
         // warm up consumers before starting the connectors so we don't need 
to wait for discovery
@@ -224,8 +224,8 @@ public class 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
     @Test
     public void testCreatePartitionsUseProvidedForwardingAdmin() throws 
Exception {
         mm2Config = new MirrorMakerConfig(mm2Props);
-        produceMessages(backup, "test-topic-1");
-        produceMessages(primary, "test-topic-1");
+        produceMessages(backupProducer, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
         String consumerGroupName = "consumer-group-testReplication";
         Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
         // warm up consumers before starting the connectors so we don't need 
to wait for discovery
@@ -257,8 +257,8 @@ public class 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
     public void testSyncTopicConfigUseProvidedForwardingAdmin() throws 
Exception {
         mm2Props.put("sync.topic.configs.enabled", "true");
         mm2Config = new MirrorMakerConfig(mm2Props);
-        produceMessages(backup, "test-topic-1");
-        produceMessages(primary, "test-topic-1");
+        produceMessages(backupProducer, "test-topic-1");
+        produceMessages(primaryProducer, "test-topic-1");
         String consumerGroupName = "consumer-group-testReplication";
         Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
         // warm up consumers before starting the connectors so we don't need 
to wait for discovery

Reply via email to