This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch kafka-16383
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b7cda81773dafc3a11ef269cd490b8f855282195
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Jul 15 17:05:24 2024 -0400

    KAFKA-16383: Ensure tasks have already polled their consumers before 
producing verified records in 
MirrorConnectorsIntegrationBaseTest::testReplicateFromLatest
---
 .../MirrorConnectorsIntegrationBaseTest.java       | 34 ++++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 396ba48901f..3b6f52ca191 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -932,6 +932,9 @@ public class MirrorConnectorsIntegrationBaseTest {
         String topic = "test-topic-1";
         produceMessages(primaryProducer, topic, NUM_PARTITIONS);
 
+        String sentinelTopic = "test-topic-sentinel";
+        primary.kafka().createTopic(sentinelTopic);
+
         // consume from the ends of topics when no committed offsets are found
         mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + 
AUTO_OFFSET_RESET_CONFIG, "latest");
         // one way replication from primary to backup
@@ -939,18 +942,44 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Config = new MirrorMakerConfig(mm2Props);
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
 
+        String backupSentinelTopic = remoteTopicName(sentinelTopic, 
PRIMARY_CLUSTER_ALIAS);
+        waitForTopicCreated(backup, backupSentinelTopic);
+
+        // wait for proof that the task has managed to poll its consumers at 
least once;
+        // this should also mean that it knows the proper end offset of the 
other test topic,
+        // and will consume exactly the expected number of records that we 
produce after
+        // this assertion passes
+        // NOTE: this assumes that there is a single MirrorSourceTask instance 
running;
+        // if there are multiple tasks, the logic will need to be updated to 
ensure that each
+        // task has managed to poll its consumer before continuing
+        waitForCondition(
+                () -> {
+                    primary.kafka().produce(sentinelTopic, "sentinel-value");
+                    int sentinelValues = backup.kafka().consumeAll(1_000, 
backupSentinelTopic).count();
+                    return sentinelValues > 0;
+                },
+                RECORD_TRANSFER_DURATION_MS,
+                "Records were not produced to sentinel topic in time"
+        );
+
         // produce some more messages to the topic, now that MM2 is running 
and replication should be taking place
         produceMessages(primaryProducer, topic, NUM_PARTITIONS);
 
         String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
         // wait for at least the expected number of records to be replicated 
to the backup cluster
-        backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, backupTopic);
+        backup.kafka().consume(NUM_RECORDS_PRODUCED, 
RECORD_TRANSFER_DURATION_MS, backupTopic);
+
         // consume all records from backup cluster
         ConsumerRecords<byte[], byte[]> replicatedRecords = 
backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic);
+
         // ensure that we only replicated the records produced after startup
         replicatedRecords.partitions().forEach(topicPartition -> {
             int replicatedCount = 
replicatedRecords.records(topicPartition).size();
-            assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount);
+            assertEquals(
+                    NUM_RECORDS_PER_PARTITION,
+                    replicatedCount,
+                    "Unexpected number of replicated records for partition " + 
topicPartition.partition()
+            );
         });
     }
 
@@ -1324,7 +1353,6 @@ public class MirrorConnectorsIntegrationBaseTest {
     private static Map<String, String> basicMM2Config() {
         Map<String, String> mm2Props = new HashMap<>();
         mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + 
BACKUP_CLUSTER_ALIAS);
-        mm2Props.put("max.tasks", "10");
         mm2Props.put("groups", "consumer-group-.*");
         mm2Props.put("sync.topic.acls.enabled", "false");
         mm2Props.put("emit.checkpoints.interval.seconds", 
String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));

Reply via email to