This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 177b38ad66d KAFKA-16383: Ensure tasks have already polled their
consumers before producing verified records in
MirrorConnectorsIntegrationBaseTest::testReplicateFromLatest (#16598)
177b38ad66d is described below
commit 177b38ad66daef4d62c8b5fe5ba88ba8a5eaa5f4
Author: Chris Egerton <[email protected]>
AuthorDate: Wed Jul 17 01:42:31 2024 +0200
KAFKA-16383: Ensure tasks have already polled their consumers before
producing verified records in
MirrorConnectorsIntegrationBaseTest::testReplicateFromLatest (#16598)
Reviewers: Greg Harris <[email protected]>
---
.../MirrorConnectorsIntegrationBaseTest.java | 34 ++++++++++++++++++++--
1 file changed, 31 insertions(+), 3 deletions(-)
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 396ba48901f..3b6f52ca191 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -932,6 +932,9 @@ public class MirrorConnectorsIntegrationBaseTest {
String topic = "test-topic-1";
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
+ String sentinelTopic = "test-topic-sentinel";
+ primary.kafka().createTopic(sentinelTopic);
+
// consume from the ends of topics when no committed offsets are found
mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." +
AUTO_OFFSET_RESET_CONFIG, "latest");
// one way replication from primary to backup
@@ -939,18 +942,44 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+ String backupSentinelTopic = remoteTopicName(sentinelTopic,
PRIMARY_CLUSTER_ALIAS);
+ waitForTopicCreated(backup, backupSentinelTopic);
+
+ // wait for proof that the task has managed to poll its consumers at
least once;
+ // this should also mean that it knows the proper end offset of the
other test topic,
+ // and will consume exactly the expected number of records that we
produce after
+ // this assertion passes
+ // NOTE: this assumes that there is a single MirrorSourceTask instance
running;
+ // if there are multiple tasks, the logic will need to be updated to
ensure that each
+ // task has managed to poll its consumer before continuing
+ waitForCondition(
+ () -> {
+ primary.kafka().produce(sentinelTopic, "sentinel-value");
+ int sentinelValues = backup.kafka().consumeAll(1_000,
backupSentinelTopic).count();
+ return sentinelValues > 0;
+ },
+ RECORD_TRANSFER_DURATION_MS,
+ "Records were not produced to sentinel topic in time"
+ );
+
// produce some more messages to the topic, now that MM2 is running
and replication should be taking place
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
// wait for at least the expected number of records to be replicated
to the backup cluster
- backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION,
RECORD_TRANSFER_DURATION_MS, backupTopic);
+ backup.kafka().consume(NUM_RECORDS_PRODUCED,
RECORD_TRANSFER_DURATION_MS, backupTopic);
+
// consume all records from backup cluster
ConsumerRecords<byte[], byte[]> replicatedRecords =
backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic);
+
// ensure that we only replicated the records produced after startup
replicatedRecords.partitions().forEach(topicPartition -> {
int replicatedCount =
replicatedRecords.records(topicPartition).size();
- assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount);
+ assertEquals(
+ NUM_RECORDS_PER_PARTITION,
+ replicatedCount,
+ "Unexpected number of replicated records for partition " +
topicPartition.partition()
+ );
});
}
@@ -1324,7 +1353,6 @@ public class MirrorConnectorsIntegrationBaseTest {
private static Map<String, String> basicMM2Config() {
Map<String, String> mm2Props = new HashMap<>();
mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " +
BACKUP_CLUSTER_ALIAS);
- mm2Props.put("max.tasks", "10");
mm2Props.put("groups", "consumer-group-.*");
mm2Props.put("sync.topic.acls.enabled", "false");
mm2Props.put("emit.checkpoints.interval.seconds",
String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));