This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 15b62351a13 MINOR: Add readiness check for connector and separate
Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic
(#16306)
15b62351a13 is described below
commit 15b62351a138dd88c3680e37f7b2375faa27f72c
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 13 05:43:33 2024 +0200
MINOR: Add readiness check for connector and separate Kafka cluster in
ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (#16306)
Reviewers: Greg Harris <[email protected]>
---
.../integration/ExactlyOnceSourceIntegrationTest.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 6d4b648201a..84ee814ae40 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -99,6 +99,7 @@ import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXA
import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
import static
org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -747,9 +748,18 @@ public class ExactlyOnceSourceIntegrationTest {
workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
globalOffsetsTopic);
startConnect();
- EmbeddedKafkaCluster connectorTargetedCluster = new
EmbeddedKafkaCluster(1, brokerProps);
+
+ int numConnectorTargetedBrokers = 1;
+ EmbeddedKafkaCluster connectorTargetedCluster = new
EmbeddedKafkaCluster(numConnectorTargetedBrokers, brokerProps);
try (Closeable clusterShutdown = connectorTargetedCluster::stop) {
connectorTargetedCluster.start();
+ // Wait for the connector-targeted Kafka cluster to get on its feet
+ waitForCondition(
+ () -> connectorTargetedCluster.runningBrokers().size() ==
numConnectorTargetedBrokers,
+ ConnectAssertions.WORKER_SETUP_DURATION_MS,
+ "Separate Kafka cluster did not start in time"
+ );
+
String topic = "test-topic";
connectorTargetedCluster.createTopic(topic, 3);
@@ -777,6 +787,11 @@ public class ExactlyOnceSourceIntegrationTest {
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ numTasks,
+ "connector and tasks did not start in time"
+ );
log.info("Waiting for records to be provided to worker by task");
// wait for the connector tasks to produce enough records