This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 1b1821dbffc KAFKA-16935: Automatically wait for cluster startup in 
embedded Connect integration tests (#16288)
1b1821dbffc is described below

commit 1b1821dbffcb1f26cf4b409a17f28900109b5a08
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 13 02:18:23 2024 +0200

    KAFKA-16935: Automatically wait for cluster startup in embedded Connect 
integration tests (#16288)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../MirrorConnectorsIntegrationBaseTest.java       |  4 --
 .../connect/integration/BlockingConnectorTest.java |  5 --
 .../integration/ConnectWorkerIntegrationTest.java  | 66 +---------------------
 .../ConnectorClientPolicyIntegrationTest.java      |  2 -
 .../ConnectorRestartApiIntegrationTest.java        |  2 -
 .../ConnectorTopicsIntegrationTest.java            |  6 --
 .../integration/ErrorHandlingIntegrationTest.java  |  2 -
 .../ExactlyOnceSourceIntegrationTest.java          |  3 -
 .../integration/InternalTopicsIntegrationTest.java | 14 ++---
 .../integration/OffsetsApiIntegrationTest.java     |  9 ---
 .../RebalanceSourceConnectorsIntegrationTest.java  | 18 ------
 .../integration/RestExtensionIntegrationTest.java  |  3 -
 .../SessionedProtocolIntegrationTest.java          | 13 ++---
 .../integration/SinkConnectorsIntegrationTest.java |  2 -
 .../SourceConnectorsIntegrationTest.java           |  8 +--
 .../integration/TransformationIntegrationTest.java | 12 ----
 .../connect/util/clusters/EmbeddedConnect.java     | 24 +++++++-
 17 files changed, 34 insertions(+), 159 deletions(-)

diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 4fbd282d11c..5398f21c626 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -219,16 +219,12 @@ public class MirrorConnectorsIntegrationBaseTest {
                 .build();
         
         primary.start();
-        primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Workers of " + PRIMARY_CLUSTER_ALIAS + "-connect-cluster did 
not start in time.");
 
         waitForTopicCreated(primary, "mm2-status.backup.internal");
         waitForTopicCreated(primary, "mm2-offsets.backup.internal");
         waitForTopicCreated(primary, "mm2-configs.backup.internal");
 
         backup.start();
-        backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-            "Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not 
start in time.");
 
         primaryProducer = initializeProducer(primary);
         backupProducer = initializeProducer(backup);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 3eefee64c0d..532ab1baaf0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -134,11 +134,6 @@ public class BlockingConnectorTest {
 
         // start the clusters
         connect.start();
-
-        connect.assertions().assertAtLeastNumWorkersAreUp(
-                NUM_WORKERS,
-                "Initial group of workers did not start in time"
-        );
     }
 
     @After
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index c540016f104..84ff88013fe 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -172,9 +172,6 @@ public class ConnectWorkerIntegrationTest {
         // set up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
@@ -218,9 +215,6 @@ public class ConnectWorkerIntegrationTest {
         props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
         props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + 
BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress");
 
-        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         // Try to start the connector and its single task.
         connect.configureConnector(CONNECTOR_NAME, props);
 
@@ -258,9 +252,6 @@ public class ConnectWorkerIntegrationTest {
         // set up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
@@ -312,9 +303,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         // base connector props
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
@@ -352,8 +340,6 @@ public class ConnectWorkerIntegrationTest {
             .build();
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of 
workers did not start in time.");
-
         // and when the connector is not configured to create topics
         Map<String, String> props = 
defaultSourceConnectorProps("nonexistenttopic");
         props.remove(DEFAULT_TOPIC_CREATION_PREFIX + 
REPLICATION_FACTOR_CONFIG);
@@ -405,9 +391,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         // Want to make sure to use multiple tasks
         final int numTasks = 4;
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
@@ -497,9 +480,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
         // Fail the connector on startup
         props.put("connector.start.inject.error", "true");
@@ -572,11 +552,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(
-            NUM_WORKERS,
-            "Initial group of workers did not start in time."
-        );
-
         connect.configureConnector(CONNECTOR_NAME, 
defaultSourceConnectorProps(TOPIC_NAME));
         connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
             CONNECTOR_NAME,
@@ -600,9 +575,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-            "Initial group of workers did not start in time.");
-
         CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest(
             CONNECTOR_NAME,
             defaultSourceConnectorProps(TOPIC_NAME),
@@ -634,9 +606,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-            "Initial group of workers did not start in time.");
-
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
         // Configure the connector to produce a maximum of 10 messages
@@ -687,9 +656,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-            "Initial group of workers did not start in time.");
-
         // Create topic and produce 10 messages
         connect.kafka().createTopic(TOPIC_NAME);
         for (int i = 0; i < 10; i++) {
@@ -754,9 +720,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-            "Initial group of workers did not start in time.");
-
         // Create a connector with PAUSED initial state
         CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest(
             CONNECTOR_NAME,
@@ -806,9 +769,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         connect.kafka().createTopic(TOPIC_NAME);
 
         Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
@@ -859,8 +819,6 @@ public class ConnectWorkerIntegrationTest {
                 .numWorkers(1)
                 .build();
         connect.start();
-        connect.assertions().assertAtLeastNumWorkersAreUp(1,
-                "Worker did not start in time");
 
         Map<String, String> connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
         Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1);
@@ -927,8 +885,6 @@ public class ConnectWorkerIntegrationTest {
 
         connect.start();
 
-        connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not 
brought up in time");
-
         Map<String, String> connectorWithBlockingTaskStopConfig = new 
HashMap<>();
         connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getName());
         connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
@@ -1008,11 +964,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(
-                NUM_WORKERS,
-                "Initial group of workers did not start in time."
-        );
-
         Map<String, String> connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
         int maxTasks = 1;
         connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
@@ -1177,11 +1128,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(
-                numWorkers,
-                "Initial group of workers did not start in time."
-        );
-
         final String connectorTopic = "connector-topic";
         connect.kafka().createTopic(connectorTopic, 1);
 
@@ -1214,7 +1160,7 @@ public class ConnectWorkerIntegrationTest {
 
         connect.assertions().assertAtLeastNumWorkersAreUp(
                 numWorkers,
-                "Initial group of workers did not start in time."
+                "Workers did not start in time after cluster was rolled."
         );
 
         final TopicPartition connectorTopicPartition = new 
TopicPartition(connectorTopic, 0);
@@ -1270,11 +1216,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(
-                numWorkers,
-                "Initial group of workers did not start in time."
-        );
-
         final String firstConnectorTopic = "connector-topic-1";
         connect.kafka().createTopic(firstConnectorTopic);
 
@@ -1344,11 +1285,6 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(
-                NUM_WORKERS,
-                "Initial group of workers did not start in time."
-        );
-
         final String topic = "kafka9228";
         connect.kafka().createTopic(topic, 1);
         connect.kafka().produce(topic, "non-json-value");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index a0abece17d7..f09a949010c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -121,8 +121,6 @@ public class ConnectorClientPolicyIntegrationTest {
 
         // start the clusters
         connect.start();
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
 
         return connect;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
index a512eeaae0a..e957b97f611 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
@@ -113,8 +113,6 @@ public class ConnectorRestartApiIntegrationTest {
             connect.start();
             return connect;
         });
-        connect.assertions().assertExactlyNumWorkersAreUp(numWorkers,
-                "Initial group of workers did not start in time.");
     }
 
     @After
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index eb055ab13fb..e9fdd91c387 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -115,8 +115,6 @@ public class ConnectorTopicsIntegrationTest {
         connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
         connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
-
         connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, 
Collections.emptyList(),
                 "Active topic set is not empty for connector: " + 
FOO_CONNECTOR);
 
@@ -179,8 +177,6 @@ public class ConnectorTopicsIntegrationTest {
         connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
         connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
-
         connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, 
Collections.emptyList(),
                 "Active topic set is not empty for connector: " + 
FOO_CONNECTOR);
 
@@ -235,8 +231,6 @@ public class ConnectorTopicsIntegrationTest {
         connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
         connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
-
         // start a source connector
         connect.configureConnector(FOO_CONNECTOR, 
defaultSourceConnectorProps(FOO_TOPIC));
         
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, 
NUM_TASKS,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 1a76956be61..30fe48116fc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -94,8 +94,6 @@ public class ErrorHandlingIntegrationTest {
 
         // start Connect cluster
         connect.start();
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
 
         // get connector handles before starting test.
         connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 04dd4c7de67..6d4b648201a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -500,9 +500,6 @@ public class ExactlyOnceSourceIntegrationTest {
         connectorHandle.expectedRecords(MINIMUM_MESSAGES);
         connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
-        // make sure the worker is actually up (otherwise, it may fence out 
our simulated zombie leader, instead of the other way around)
-        connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker 
did not complete startup in time");
-
         // fence out the leader of the cluster
         Producer<?, ?> zombieLeader = transactionalProducer(
                 "simulated-zombie-leader",
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
index d73d1c4ed06..c044bb82298 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
@@ -72,8 +72,6 @@ public class InternalTopicsIntegrationTest {
 
         // Start the Connect cluster
         connect.start();
-        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers 
did not start in time.");
-        connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker 
did not start in time.");
         log.info("Completed startup of {} Kafka brokers and {} Connect 
workers", numBrokers, numWorkers);
 
         // Check the topics
@@ -111,9 +109,6 @@ public class InternalTopicsIntegrationTest {
 
         // Start the Connect cluster
         connect.start();
-        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker 
did not start in time.");
-        connect.assertions().assertAtLeastNumWorkersAreUp(numWorkers, "Worker 
did not start in time.");
-        log.info("Completed startup of {} Kafka brokers and {} Connect 
workers", numBrokers, numWorkers);
 
         // Check the topics
         log.info("Verifying the internal topics for Connect");
@@ -126,7 +121,7 @@ public class InternalTopicsIntegrationTest {
         
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
"3");
         
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
"2");
         
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
"1");
-        int numWorkers = 1;
+        int numWorkers = 0;
         int numBrokers = 1;
         connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")
                                                       .workerProps(workerProps)
@@ -137,11 +132,14 @@ public class InternalTopicsIntegrationTest {
 
         // Start the brokers and Connect, but Connect should fail to create 
config and offset topic
         connect.start();
-        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker 
did not start in time.");
         log.info("Completed startup of {} Kafka broker. Expected Connect 
worker to fail", numBrokers);
 
+        // Try to start a worker
+        connect.addWorker();
+
         // Verify that the offset and config topic don't exist;
         // the status topic may have been created if timing was right but we 
don't care
+        // TODO: Synchronously await and verify that the worker fails during 
startup
         log.info("Verifying the internal topics for Connect");
         connect.assertions().assertTopicsDoNotExist(configTopic(), 
offsetTopic());
     }
@@ -169,7 +167,6 @@ public class InternalTopicsIntegrationTest {
         // Start the brokers but not Connect
         log.info("Starting {} Kafka brokers, but no Connect workers yet", 
numBrokers);
         connect.start();
-        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker 
did not start in time.");
         log.info("Completed startup of {} Kafka broker. Expected Connect 
worker to fail", numBrokers);
 
         // Create the good topics
@@ -243,7 +240,6 @@ public class InternalTopicsIntegrationTest {
         // Start the brokers but not Connect
         log.info("Starting {} Kafka brokers, but no Connect workers yet", 
numBrokers);
         connect.start();
-        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker 
did not start in time.");
         log.info("Completed startup of {} Kafka broker. Expected Connect 
worker to fail", numBrokers);
 
         // Create the valid internal topics w/o topic settings, so these will 
use the broker's
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
index 2da52cf9abd..dc507b68df7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
@@ -143,15 +143,6 @@ public class OffsetsApiIntegrationTest {
 
             result.start();
 
-            try {
-                result.assertions().assertExactlyNumWorkersAreUp(
-                        NUM_WORKERS,
-                        "Workers did not complete startup in time"
-                );
-            } catch (InterruptedException e) {
-                throw new RuntimeException("Interrupted while awaiting cluster 
startup", e);
-            }
-
             return result;
         });
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 82004c8dc3a..4c803a34921 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -116,9 +116,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Connect workers did not start in time.");
-
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
@@ -147,9 +144,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Connect workers did not start in time.");
-
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
@@ -194,9 +188,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Connect workers did not start in time.");
-
         // start several source connectors
         IntStream.range(0, 4).forEachOrdered(i -> 
connect.configureConnector(CONNECTOR_NAME + i, props));
 
@@ -221,9 +212,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Connect workers did not start in time.");
-
         // start a source connector
         IntStream.range(0, 4).forEachOrdered(i -> 
connect.configureConnector(CONNECTOR_NAME + i, props));
 
@@ -250,9 +238,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
-                "Connect workers did not start in time.");
-
         // start a source connector
         IntStream.range(0, 4).forEachOrdered(i -> 
connect.configureConnector(CONNECTOR_NAME + i, props));
 
@@ -276,9 +261,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
 
-        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
-                "Connect workers did not start in time.");
-
         // start a source connector
         IntStream.range(0, 4).forEachOrdered(i -> 
connect.configureConnector(CONNECTOR_NAME + i, props));
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index a0f993f2f63..112dd219811 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -77,9 +77,6 @@ public class RestExtensionIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-                "Initial group of workers did not start in time.");
-
         WorkerHandle worker = connect.workers().stream()
             .findFirst()
             .orElseThrow(() -> new AssertionError("At least one worker handle 
should be available"));
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
index 7ced24c82f8..8f71033b798 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
@@ -19,9 +19,7 @@ package org.apache.kafka.connect.integration;
 import 
org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
-import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -98,10 +96,6 @@ public class SessionedProtocolIntegrationTest {
         invalidSignatureHeaders.put(SIGNATURE_HEADER, "S2Fma2Flc3F1ZQ==");
         invalidSignatureHeaders.put(SIGNATURE_ALGORITHM_HEADER, "HmacSHA256");
 
-        TestUtils.waitForCondition(
-                () -> 
connect.workers().stream().allMatch(WorkerHandle::isRunning),
-                30000L, "Timed out waiting for workers to start");
-
         // We haven't created the connector yet, but this should still return 
a 400 instead of a 404
         // if the endpoint is secured
         log.info(
@@ -120,9 +114,10 @@ public class SessionedProtocolIntegrationTest {
                 + "expecting 403 error response",
             connectorTasksEndpoint
         );
-        TestUtils.waitForCondition(
-                () -> connect.requestPost(connectorTasksEndpoint, "[]", 
invalidSignatureHeaders).getStatus() == FORBIDDEN.getStatusCode(),
-                30000L, "Timed out waiting for workers to start");
+        assertEquals(
+                FORBIDDEN.getStatusCode(),
+                connect.requestPost(connectorTasksEndpoint, "[]", 
invalidSignatureHeaders).getStatus()
+        );
 
         // Create the connector now
         // setup up props for the sink connector
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
index a8bfbb291ff..f42addf396c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
@@ -83,7 +83,6 @@ public class SinkConnectorsIntegrationTest {
                 .brokerProps(brokerProps)
                 .build();
         connect.start();
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
     }
 
     @After
@@ -209,7 +208,6 @@ public class SinkConnectorsIntegrationTest {
         final Collection<String> topics = Arrays.asList(topic1, topic2, 
topic3);
 
         Map<String, String> connectorProps = 
baseSinkConnectorProps(String.join(",", topics));
-        // Need an eager assignor here; round robin is as good as any
         connectorProps.put(
                 CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                 CooperativeStickyAssignor.class.getName());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
index b35b0720802..046e5a9ccf7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
@@ -103,8 +103,6 @@ public class SourceConnectorsIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
-
         Map<String, String> fooProps = 
sourceConnectorPropsWithGroups(FOO_TOPIC);
 
         // start a source connector
@@ -128,8 +126,6 @@ public class SourceConnectorsIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
-
         Map<String, String> fooProps = 
sourceConnectorPropsWithGroups(FOO_TOPIC);
 
         // start a source connector
@@ -160,8 +156,6 @@ public class SourceConnectorsIntegrationTest {
         connect.assertions().assertTopicSettings(BAR_TOPIC, 
DEFAULT_REPLICATION_FACTOR,
                 DEFAULT_PARTITIONS, "Topic " + BAR_TOPIC + " does not have the 
expected settings");
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
-
         Map<String, String> barProps = defaultSourceConnectorProps(BAR_TOPIC);
         // start a source connector with topic creation properties
         connect.configureConnector(BAR_CONNECTOR, barProps);
@@ -195,7 +189,7 @@ public class SourceConnectorsIntegrationTest {
         workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(true));
 
         IntStream.range(0, 3).forEach(i -> connect.addWorker());
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Initial group of workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, 
"Workers did not start in time after cluster was rolled.");
 
         
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
index 02d8c7f71b1..760684425df 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
@@ -115,8 +115,6 @@ public class TransformationIntegrationTest {
      */
     @Test
     public void testFilterOnTopicNameWithSinkConnector() throws Exception {
-        assertConnectReady();
-
         Map<String, Long> observedRecords = observeRecords();
 
         // create test topics
@@ -180,12 +178,6 @@ public class TransformationIntegrationTest {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
-    private void assertConnectReady() throws InterruptedException {
-        connect.assertions().assertExactlyNumBrokersAreUp(1, "Brokers did not 
start in time.");
-        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, "Worker 
did not start in time.");
-        log.info("Completed startup of {} Kafka brokers and {} Connect 
workers", 1, NUM_WORKERS);
-    }
-
     private void assertConnectorRunning() throws InterruptedException {
         
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
                 "Connector tasks did not start in time.");
@@ -212,8 +204,6 @@ public class TransformationIntegrationTest {
      */
     @Test
     public void testFilterOnTombstonesWithSinkConnector() throws Exception {
-        assertConnectReady();
-
         Map<String, Long> observedRecords = observeRecords();
 
         // create test topics
@@ -273,8 +263,6 @@ public class TransformationIntegrationTest {
      */
     @Test
     public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() 
throws Exception {
-        assertConnectReady();
-
         // setup up props for the sink connector
         Map<String, String> props = new HashMap<>();
         props.put("name", CONNECTOR_NAME);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index a37de76d1ae..e7e268425c7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -118,7 +118,8 @@ abstract class EmbeddedConnect {
     };
 
     /**
-     * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
+     * Start the Connect cluster and the embedded Kafka and Zookeeper cluster,
+     * and wait for the Kafka and Connect clusters to become healthy.
      */
     public void start() {
         if (maskExitProcedures) {
@@ -132,6 +133,27 @@ abstract class EmbeddedConnect {
         } catch (Exception e) {
             throw new ConnectException("Failed to start HTTP client", e);
         }
+
+        try {
+            if (numBrokers > 0) {
+                assertions().assertExactlyNumBrokersAreUp(
+                        numBrokers,
+                        "Kafka cluster did not start in time"
+                );
+                log.info("Completed startup of {} Kafka brokers", numBrokers);
+            }
+
+            int numWorkers = workers().size();
+            if (numWorkers > 0) {
+                assertions().assertExactlyNumWorkersAreUp(
+                        numWorkers,
+                        "Connect cluster did not start in time"
+                );
+                log.info("Completed startup of {} Connect workers", 
numWorkers);
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while awaiting cluster 
startup", e);
+        }
     }
 
     /**


Reply via email to