This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch kafka-16383 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b7cda81773dafc3a11ef269cd490b8f855282195 Author: Chris Egerton <[email protected]> AuthorDate: Mon Jul 15 17:05:24 2024 -0400 KAFKA-16383: Ensure tasks have already polled their consumers before producing verified records in MirrorConnectorsIntegrationBaseTest::testReplicateFromLatest --- .../MirrorConnectorsIntegrationBaseTest.java | 34 ++++++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 396ba48901f..3b6f52ca191 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -932,6 +932,9 @@ public class MirrorConnectorsIntegrationBaseTest { String topic = "test-topic-1"; produceMessages(primaryProducer, topic, NUM_PARTITIONS); + String sentinelTopic = "test-topic-sentinel"; + primary.kafka().createTopic(sentinelTopic); + // consume from the ends of topics when no committed offsets are found mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + AUTO_OFFSET_RESET_CONFIG, "latest"); // one way replication from primary to backup @@ -939,18 +942,44 @@ public class MirrorConnectorsIntegrationBaseTest { mm2Config = new MirrorMakerConfig(mm2Props); waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + String backupSentinelTopic = remoteTopicName(sentinelTopic, PRIMARY_CLUSTER_ALIAS); + waitForTopicCreated(backup, backupSentinelTopic); + + // wait for proof that the task has managed to poll its consumers at least once; + // this should also mean that it knows the proper end offset of the other test topic, + // and will consume exactly the expected number of records that we produce after + // this assertion passes + // NOTE: this assumes that there is a single MirrorSourceTask instance running; + // if there are multiple tasks, the logic will need to be updated to ensure that each + // task has managed to poll its consumer before continuing + waitForCondition( + () -> { + primary.kafka().produce(sentinelTopic, "sentinel-value"); + int sentinelValues = backup.kafka().consumeAll(1_000, backupSentinelTopic).count(); + return sentinelValues > 0; + }, + RECORD_TRANSFER_DURATION_MS, + "Records were not produced to sentinel topic in time" + ); + // produce some more messages to the topic, now that MM2 is running and replication should be taking place produceMessages(primaryProducer, topic, NUM_PARTITIONS); String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); // wait for at least the expected number of records to be replicated to the backup cluster - backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, backupTopic); + backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, backupTopic); + // consume all records from backup cluster ConsumerRecords<byte[], byte[]> replicatedRecords = backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic); + // ensure that we only replicated the records produced after startup replicatedRecords.partitions().forEach(topicPartition -> { int replicatedCount = replicatedRecords.records(topicPartition).size(); - assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount); + assertEquals( + NUM_RECORDS_PER_PARTITION, + replicatedCount, + "Unexpected number of replicated records for partition " + topicPartition.partition() + ); }); } @@ -1324,7 +1353,6 @@ public class MirrorConnectorsIntegrationBaseTest { private static Map<String, String> basicMM2Config() { Map<String, String> mm2Props = new HashMap<>(); mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS); - mm2Props.put("max.tasks", "10"); mm2Props.put("groups", "consumer-group-.*"); mm2Props.put("sync.topic.acls.enabled", "false"); mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
