This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 15b62351a13 MINOR: Add readiness check for connector and separate 
Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic 
(#16306)
15b62351a13 is described below

commit 15b62351a138dd88c3680e37f7b2375faa27f72c
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 13 05:43:33 2024 +0200

    MINOR: Add readiness check for connector and separate Kafka cluster in 
ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../integration/ExactlyOnceSourceIntegrationTest.java   | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 6d4b648201a..84ee814ae40 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -99,6 +99,7 @@ import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXA
 import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
 import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
 import static 
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -747,9 +748,18 @@ public class ExactlyOnceSourceIntegrationTest {
         workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
globalOffsetsTopic);
 
         startConnect();
-        EmbeddedKafkaCluster connectorTargetedCluster = new 
EmbeddedKafkaCluster(1, brokerProps);
+
+        int numConnectorTargetedBrokers = 1;
+        EmbeddedKafkaCluster connectorTargetedCluster = new 
EmbeddedKafkaCluster(numConnectorTargetedBrokers, brokerProps);
         try (Closeable clusterShutdown = connectorTargetedCluster::stop) {
             connectorTargetedCluster.start();
+            // Wait for the connector-targeted Kafka cluster to get on its feet
+            waitForCondition(
+                    () -> connectorTargetedCluster.runningBrokers().size() == 
numConnectorTargetedBrokers,
+                    ConnectAssertions.WORKER_SETUP_DURATION_MS,
+                    "Separate Kafka cluster did not start in time"
+            );
+
             String topic = "test-topic";
             connectorTargetedCluster.createTopic(topic, 3);
 
@@ -777,6 +787,11 @@ public class ExactlyOnceSourceIntegrationTest {
 
             // start a source connector
             connect.configureConnector(CONNECTOR_NAME, props);
+            connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                    CONNECTOR_NAME,
+                    numTasks,
+                    "connector and tasks did not start in time"
+            );
 
             log.info("Waiting for records to be provided to worker by task");
             // wait for the connector tasks to produce enough records

Reply via email to