This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 1b1821dbffc KAFKA-16935: Automatically wait for cluster startup in
embedded Connect integration tests (#16288)
1b1821dbffc is described below
commit 1b1821dbffcb1f26cf4b409a17f28900109b5a08
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 13 02:18:23 2024 +0200
KAFKA-16935: Automatically wait for cluster startup in embedded Connect
integration tests (#16288)
Reviewers: Greg Harris <[email protected]>
---
.../MirrorConnectorsIntegrationBaseTest.java | 4 --
.../connect/integration/BlockingConnectorTest.java | 5 --
.../integration/ConnectWorkerIntegrationTest.java | 66 +---------------------
.../ConnectorClientPolicyIntegrationTest.java | 2 -
.../ConnectorRestartApiIntegrationTest.java | 2 -
.../ConnectorTopicsIntegrationTest.java | 6 --
.../integration/ErrorHandlingIntegrationTest.java | 2 -
.../ExactlyOnceSourceIntegrationTest.java | 3 -
.../integration/InternalTopicsIntegrationTest.java | 14 ++---
.../integration/OffsetsApiIntegrationTest.java | 9 ---
.../RebalanceSourceConnectorsIntegrationTest.java | 18 ------
.../integration/RestExtensionIntegrationTest.java | 3 -
.../SessionedProtocolIntegrationTest.java | 13 ++---
.../integration/SinkConnectorsIntegrationTest.java | 2 -
.../SourceConnectorsIntegrationTest.java | 8 +--
.../integration/TransformationIntegrationTest.java | 12 ----
.../connect/util/clusters/EmbeddedConnect.java | 24 +++++++-
17 files changed, 34 insertions(+), 159 deletions(-)
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 4fbd282d11c..5398f21c626 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -219,16 +219,12 @@ public class MirrorConnectorsIntegrationBaseTest {
.build();
primary.start();
- primary.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Workers of " + PRIMARY_CLUSTER_ALIAS + "-connect-cluster did
not start in time.");
waitForTopicCreated(primary, "mm2-status.backup.internal");
waitForTopicCreated(primary, "mm2-offsets.backup.internal");
waitForTopicCreated(primary, "mm2-configs.backup.internal");
backup.start();
- backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not
start in time.");
primaryProducer = initializeProducer(primary);
backupProducer = initializeProducer(backup);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 3eefee64c0d..532ab1baaf0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -134,11 +134,6 @@ public class BlockingConnectorTest {
// start the clusters
connect.start();
-
- connect.assertions().assertAtLeastNumWorkersAreUp(
- NUM_WORKERS,
- "Initial group of workers did not start in time"
- );
}
@After
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index c540016f104..84ff88013fe 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -172,9 +172,6 @@ public class ConnectWorkerIntegrationTest {
// set up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -218,9 +215,6 @@ public class ConnectWorkerIntegrationTest {
props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX +
BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress");
- connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
// Try to start the connector and its single task.
connect.configureConnector(CONNECTOR_NAME, props);
@@ -258,9 +252,6 @@ public class ConnectWorkerIntegrationTest {
// set up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -312,9 +303,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
// base connector props
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getSimpleName());
@@ -352,8 +340,6 @@ public class ConnectWorkerIntegrationTest {
.build();
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of
workers did not start in time.");
-
// and when the connector is not configured to create topics
Map<String, String> props =
defaultSourceConnectorProps("nonexistenttopic");
props.remove(DEFAULT_TOPIC_CREATION_PREFIX +
REPLICATION_FACTOR_CONFIG);
@@ -405,9 +391,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
// Want to make sure to use multiple tasks
final int numTasks = 4;
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
@@ -497,9 +480,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
// Fail the connector on startup
props.put("connector.start.inject.error", "true");
@@ -572,11 +552,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(
- NUM_WORKERS,
- "Initial group of workers did not start in time."
- );
-
connect.configureConnector(CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME));
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
@@ -600,9 +575,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest(
CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME),
@@ -634,9 +606,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
// Configure the connector to produce a maximum of 10 messages
@@ -687,9 +656,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
// Create topic and produce 10 messages
connect.kafka().createTopic(TOPIC_NAME);
for (int i = 0; i < 10; i++) {
@@ -754,9 +720,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
// Create a connector with PAUSED initial state
CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest(
CONNECTOR_NAME,
@@ -806,9 +769,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
connect.kafka().createTopic(TOPIC_NAME);
Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
@@ -859,8 +819,6 @@ public class ConnectWorkerIntegrationTest {
.numWorkers(1)
.build();
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(1,
- "Worker did not start in time");
Map<String, String> connectorConfig1 =
defaultSourceConnectorProps(TOPIC_NAME);
Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1);
@@ -927,8 +885,6 @@ public class ConnectWorkerIntegrationTest {
connect.start();
- connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not
brought up in time");
-
Map<String, String> connectorWithBlockingTaskStopConfig = new
HashMap<>();
connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG,
BlockingConnectorTest.BlockingSourceConnector.class.getName());
connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
@@ -1008,11 +964,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(
- NUM_WORKERS,
- "Initial group of workers did not start in time."
- );
-
Map<String, String> connectorProps =
defaultSourceConnectorProps(TOPIC_NAME);
int maxTasks = 1;
connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
@@ -1177,11 +1128,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(
- numWorkers,
- "Initial group of workers did not start in time."
- );
-
final String connectorTopic = "connector-topic";
connect.kafka().createTopic(connectorTopic, 1);
@@ -1214,7 +1160,7 @@ public class ConnectWorkerIntegrationTest {
connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
- "Initial group of workers did not start in time."
+ "Workers did not start in time after cluster was rolled."
);
final TopicPartition connectorTopicPartition = new
TopicPartition(connectorTopic, 0);
@@ -1270,11 +1216,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(
- numWorkers,
- "Initial group of workers did not start in time."
- );
-
final String firstConnectorTopic = "connector-topic-1";
connect.kafka().createTopic(firstConnectorTopic);
@@ -1344,11 +1285,6 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(
- NUM_WORKERS,
- "Initial group of workers did not start in time."
- );
-
final String topic = "kafka9228";
connect.kafka().createTopic(topic, 1);
connect.kafka().produce(topic, "non-json-value");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index a0abece17d7..f09a949010c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -121,8 +121,6 @@ public class ConnectorClientPolicyIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
return connect;
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
index a512eeaae0a..e957b97f611 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
@@ -113,8 +113,6 @@ public class ConnectorRestartApiIntegrationTest {
connect.start();
return connect;
});
- connect.assertions().assertExactlyNumWorkersAreUp(numWorkers,
- "Initial group of workers did not start in time.");
}
@After
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index eb055ab13fb..e9fdd91c387 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -115,8 +115,6 @@ public class ConnectorTopicsIntegrationTest {
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
-
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR,
Collections.emptyList(),
"Active topic set is not empty for connector: " +
FOO_CONNECTOR);
@@ -179,8 +177,6 @@ public class ConnectorTopicsIntegrationTest {
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
-
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR,
Collections.emptyList(),
"Active topic set is not empty for connector: " +
FOO_CONNECTOR);
@@ -235,8 +231,6 @@ public class ConnectorTopicsIntegrationTest {
connect.kafka().createTopic(FOO_TOPIC, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(BAR_TOPIC, NUM_TOPIC_PARTITIONS);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
-
// start a source connector
connect.configureConnector(FOO_CONNECTOR,
defaultSourceConnectorProps(FOO_TOPIC));
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR,
NUM_TASKS,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 1a76956be61..30fe48116fc 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -94,8 +94,6 @@ public class ErrorHandlingIntegrationTest {
// start Connect cluster
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
// get connector handles before starting test.
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 04dd4c7de67..6d4b648201a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -500,9 +500,6 @@ public class ExactlyOnceSourceIntegrationTest {
connectorHandle.expectedRecords(MINIMUM_MESSAGES);
connectorHandle.expectedCommits(MINIMUM_MESSAGES);
- // make sure the worker is actually up (otherwise, it may fence out
our simulated zombie leader, instead of the other way around)
- connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker
did not complete startup in time");
-
// fence out the leader of the cluster
Producer<?, ?> zombieLeader = transactionalProducer(
"simulated-zombie-leader",
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
index d73d1c4ed06..c044bb82298 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
@@ -72,8 +72,6 @@ public class InternalTopicsIntegrationTest {
// Start the Connect cluster
connect.start();
- connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers
did not start in time.");
- connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker
did not start in time.");
log.info("Completed startup of {} Kafka brokers and {} Connect
workers", numBrokers, numWorkers);
// Check the topics
@@ -111,9 +109,6 @@ public class InternalTopicsIntegrationTest {
// Start the Connect cluster
connect.start();
- connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker
did not start in time.");
- connect.assertions().assertAtLeastNumWorkersAreUp(numWorkers, "Worker
did not start in time.");
- log.info("Completed startup of {} Kafka brokers and {} Connect
workers", numBrokers, numWorkers);
// Check the topics
log.info("Verifying the internal topics for Connect");
@@ -126,7 +121,7 @@ public class InternalTopicsIntegrationTest {
workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
"3");
workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG,
"2");
workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG,
"1");
- int numWorkers = 1;
+ int numWorkers = 0;
int numBrokers = 1;
connect = new
EmbeddedConnectCluster.Builder().name("connect-cluster-1")
.workerProps(workerProps)
@@ -137,11 +132,14 @@ public class InternalTopicsIntegrationTest {
// Start the brokers and Connect, but Connect should fail to create
config and offset topic
connect.start();
- connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker
did not start in time.");
log.info("Completed startup of {} Kafka broker. Expected Connect
worker to fail", numBrokers);
+ // Try to start a worker
+ connect.addWorker();
+
// Verify that the offset and config topic don't exist;
// the status topic may have been created if timing was right but we
don't care
+ // TODO: Synchronously await and verify that the worker fails during
startup
log.info("Verifying the internal topics for Connect");
connect.assertions().assertTopicsDoNotExist(configTopic(),
offsetTopic());
}
@@ -169,7 +167,6 @@ public class InternalTopicsIntegrationTest {
// Start the brokers but not Connect
log.info("Starting {} Kafka brokers, but no Connect workers yet",
numBrokers);
connect.start();
- connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker
did not start in time.");
log.info("Completed startup of {} Kafka broker. Expected Connect
worker to fail", numBrokers);
// Create the good topics
@@ -243,7 +240,6 @@ public class InternalTopicsIntegrationTest {
// Start the brokers but not Connect
log.info("Starting {} Kafka brokers, but no Connect workers yet",
numBrokers);
connect.start();
- connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker
did not start in time.");
log.info("Completed startup of {} Kafka broker. Expected Connect
worker to fail", numBrokers);
// Create the valid internal topics w/o topic settings, so these will
use the broker's
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
index 2da52cf9abd..dc507b68df7 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
@@ -143,15 +143,6 @@ public class OffsetsApiIntegrationTest {
result.start();
- try {
- result.assertions().assertExactlyNumWorkersAreUp(
- NUM_WORKERS,
- "Workers did not complete startup in time"
- );
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while awaiting cluster
startup", e);
- }
-
return result;
});
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 82004c8dc3a..4c803a34921 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -116,9 +116,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
// setup up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Connect workers did not start in time.");
-
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -147,9 +144,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
// setup up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Connect workers did not start in time.");
-
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -194,9 +188,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
// setup up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Connect workers did not start in time.");
-
// start several source connectors
IntStream.range(0, 4).forEachOrdered(i ->
connect.configureConnector(CONNECTOR_NAME + i, props));
@@ -221,9 +212,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
// setup up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Connect workers did not start in time.");
-
// start a source connector
IntStream.range(0, 4).forEachOrdered(i ->
connect.configureConnector(CONNECTOR_NAME + i, props));
@@ -250,9 +238,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
// setup up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
- "Connect workers did not start in time.");
-
// start a source connector
IntStream.range(0, 4).forEachOrdered(i ->
connect.configureConnector(CONNECTOR_NAME + i, props));
@@ -276,9 +261,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
// setup up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
- connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
- "Connect workers did not start in time.");
-
// start a source connector
IntStream.range(0, 4).forEachOrdered(i ->
connect.configureConnector(CONNECTOR_NAME + i, props));
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index a0f993f2f63..112dd219811 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -77,9 +77,6 @@ public class RestExtensionIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
-
WorkerHandle worker = connect.workers().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("At least one worker handle
should be available"));
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
index 7ced24c82f8..8f71033b798 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
@@ -19,9 +19,7 @@ package org.apache.kafka.connect.integration;
import
org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
-import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -98,10 +96,6 @@ public class SessionedProtocolIntegrationTest {
invalidSignatureHeaders.put(SIGNATURE_HEADER, "S2Fma2Flc3F1ZQ==");
invalidSignatureHeaders.put(SIGNATURE_ALGORITHM_HEADER, "HmacSHA256");
- TestUtils.waitForCondition(
- () ->
connect.workers().stream().allMatch(WorkerHandle::isRunning),
- 30000L, "Timed out waiting for workers to start");
-
// We haven't created the connector yet, but this should still return
a 400 instead of a 404
// if the endpoint is secured
log.info(
@@ -120,9 +114,10 @@ public class SessionedProtocolIntegrationTest {
+ "expecting 403 error response",
connectorTasksEndpoint
);
- TestUtils.waitForCondition(
- () -> connect.requestPost(connectorTasksEndpoint, "[]",
invalidSignatureHeaders).getStatus() == FORBIDDEN.getStatusCode(),
- 30000L, "Timed out waiting for workers to start");
+ assertEquals(
+ FORBIDDEN.getStatusCode(),
+ connect.requestPost(connectorTasksEndpoint, "[]",
invalidSignatureHeaders).getStatus()
+ );
// Create the connector now
// setup up props for the sink connector
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
index a8bfbb291ff..f42addf396c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
@@ -83,7 +83,6 @@ public class SinkConnectorsIntegrationTest {
.brokerProps(brokerProps)
.build();
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
}
@After
@@ -209,7 +208,6 @@ public class SinkConnectorsIntegrationTest {
final Collection<String> topics = Arrays.asList(topic1, topic2,
topic3);
Map<String, String> connectorProps =
baseSinkConnectorProps(String.join(",", topics));
- // Need an eager assignor here; round robin is as good as any
connectorProps.put(
CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX +
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
index b35b0720802..046e5a9ccf7 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
@@ -103,8 +103,6 @@ public class SourceConnectorsIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
-
Map<String, String> fooProps =
sourceConnectorPropsWithGroups(FOO_TOPIC);
// start a source connector
@@ -128,8 +126,6 @@ public class SourceConnectorsIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
-
Map<String, String> fooProps =
sourceConnectorPropsWithGroups(FOO_TOPIC);
// start a source connector
@@ -160,8 +156,6 @@ public class SourceConnectorsIntegrationTest {
connect.assertions().assertTopicSettings(BAR_TOPIC,
DEFAULT_REPLICATION_FACTOR,
DEFAULT_PARTITIONS, "Topic " + BAR_TOPIC + " does not have the
expected settings");
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
-
Map<String, String> barProps = defaultSourceConnectorProps(BAR_TOPIC);
// start a source connector with topic creation properties
connect.configureConnector(BAR_CONNECTOR, barProps);
@@ -195,7 +189,7 @@ public class SourceConnectorsIntegrationTest {
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(true));
IntStream.range(0, 3).forEach(i -> connect.addWorker());
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Workers did not start in time after cluster was rolled.");
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(FOO_CONNECTOR,
NUM_TASKS,
"Connector tasks did not start in time.");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
index 02d8c7f71b1..760684425df 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
@@ -115,8 +115,6 @@ public class TransformationIntegrationTest {
*/
@Test
public void testFilterOnTopicNameWithSinkConnector() throws Exception {
- assertConnectReady();
-
Map<String, Long> observedRecords = observeRecords();
// create test topics
@@ -180,12 +178,6 @@ public class TransformationIntegrationTest {
connect.deleteConnector(CONNECTOR_NAME);
}
- private void assertConnectReady() throws InterruptedException {
- connect.assertions().assertExactlyNumBrokersAreUp(1, "Brokers did not
start in time.");
- connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, "Worker
did not start in time.");
- log.info("Completed startup of {} Kafka brokers and {} Connect
workers", 1, NUM_WORKERS);
- }
-
private void assertConnectorRunning() throws InterruptedException {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
NUM_TASKS,
"Connector tasks did not start in time.");
@@ -212,8 +204,6 @@ public class TransformationIntegrationTest {
*/
@Test
public void testFilterOnTombstonesWithSinkConnector() throws Exception {
- assertConnectReady();
-
Map<String, Long> observedRecords = observeRecords();
// create test topics
@@ -273,8 +263,6 @@ public class TransformationIntegrationTest {
*/
@Test
public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation()
throws Exception {
- assertConnectReady();
-
// setup up props for the sink connector
Map<String, String> props = new HashMap<>();
props.put("name", CONNECTOR_NAME);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index a37de76d1ae..e7e268425c7 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -118,7 +118,8 @@ abstract class EmbeddedConnect {
};
/**
- * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
+ * Start the Connect cluster and the embedded Kafka and Zookeeper cluster,
+ * and wait for the Kafka and Connect clusters to become healthy.
*/
public void start() {
if (maskExitProcedures) {
@@ -132,6 +133,27 @@ abstract class EmbeddedConnect {
} catch (Exception e) {
throw new ConnectException("Failed to start HTTP client", e);
}
+
+ try {
+ if (numBrokers > 0) {
+ assertions().assertExactlyNumBrokersAreUp(
+ numBrokers,
+ "Kafka cluster did not start in time"
+ );
+ log.info("Completed startup of {} Kafka brokers", numBrokers);
+ }
+
+ int numWorkers = workers().size();
+ if (numWorkers > 0) {
+ assertions().assertExactlyNumWorkersAreUp(
+ numWorkers,
+ "Connect cluster did not start in time"
+ );
+ log.info("Completed startup of {} Connect workers",
numWorkers);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while awaiting cluster
startup", e);
+ }
}
/**