This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7872a1ff5b2 KAFKA-14855: Harden integration testing logic for
asserting that a connector is deleted (#14371)
7872a1ff5b2 is described below
commit 7872a1ff5b2e9a0fbbe3d71180a97e29f1549d4f
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Sep 19 16:39:39 2023 +0100
KAFKA-14855: Harden integration testing logic for asserting that a
connector is deleted (#14371)
Reviewers: Sagar Rao <[email protected]>, Chris Egerton
<[email protected]>
---
.../integration/ConnectWorkerIntegrationTest.java | 8 +++----
.../ConnectorTopicsIntegrationTest.java | 8 +++----
.../integration/ErrorHandlingIntegrationTest.java | 8 +++----
.../RebalanceSourceConnectorsIntegrationTest.java | 4 ++--
.../clusters/EmbeddedConnectClusterAssertions.java | 26 +++++++++-------------
5 files changed, 24 insertions(+), 30 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 2e843cd6ec6..4c393d95ad3 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -431,9 +431,9 @@ public class ConnectWorkerIntegrationTest {
// Delete the connector
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreNotRunning(
+ connect.assertions().assertConnectorDoesNotExist(
CONNECTOR_NAME,
- "Connector tasks were not destroyed in time"
+ "Connector wasn't deleted in time"
);
}
@@ -505,9 +505,9 @@ public class ConnectWorkerIntegrationTest {
// Can delete a stopped connector
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreNotRunning(
+ connect.assertions().assertConnectorDoesNotExist(
CONNECTOR_NAME,
- "Connector and all of its tasks should no longer be running"
+ "Connector wasn't deleted in time"
);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index a8b812f8c31..0614ba8a9f7 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -150,8 +150,8 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(BAR_CONNECTOR);
-
connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(BAR_CONNECTOR,
+ "Connector wasn't deleted in time.");
connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR,
Collections.emptyList(),
"Active topic set is not empty for deleted connector: " +
BAR_CONNECTOR);
@@ -205,8 +205,8 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(FOO_CONNECTOR);
-
connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(FOO_CONNECTOR,
+ "Connector wasn't deleted in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR,
Collections.emptyList(),
"Active topic set is not empty for deleted connector: " +
FOO_CONNECTOR);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 7d3c1d6924b..55479e6d4ff 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -179,8 +179,8 @@ public class ErrorHandlingIntegrationTest {
}
connect.deleteConnector(CONNECTOR_NAME);
-
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+ "Connector wasn't deleted in time.");
}
@@ -248,8 +248,8 @@ public class ErrorHandlingIntegrationTest {
ConsumerRecords<byte[], byte[]> messages =
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS,
DLQ_TOPIC);
connect.deleteConnector(CONNECTOR_NAME);
-
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+ "Connector wasn't deleted in time.");
}
/**
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 04e12ea41e0..82004c8dc3a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -206,8 +206,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
// delete connector
connect.deleteConnector(CONNECTOR_NAME + 3);
-
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME + 3,
+ "Connector wasn't deleted in time.");
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced
between the workers.");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index c4ff5018ed1..d8e488f4727 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -455,48 +455,42 @@ public class EmbeddedConnectClusterAssertions {
}
/**
- * Assert that a connector and its tasks are not running.
+ * Assert that a connector does not exist. This can be used to verify that
a connector has been successfully deleted.
*
* @param connectorName the connector name
* @param detailMessage the assertion message
* @throws InterruptedException
*/
- public void assertConnectorAndTasksAreNotRunning(String connectorName,
String detailMessage)
+ public void assertConnectorDoesNotExist(String connectorName, String
detailMessage)
throws InterruptedException {
try {
waitForCondition(
- () -> checkConnectorAndTasksAreNotRunning(connectorName),
+ () -> checkConnectorDoesNotExist(connectorName),
CONNECTOR_SETUP_DURATION_MS,
- "At least the connector or one of its tasks is still running");
+ "The connector should not exist.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
}
/**
- * Check whether the connector or any of its tasks are still in RUNNING
state
+ * Check whether a connector exists by querying the <strong><em>GET
/connectors/{connector}/status</em></strong> endpoint
*
- * @param connectorName the connector
- * @return true if the connector and all the tasks are not in RUNNING
state; false otherwise
+ * @param connectorName the connector name
+ * @return true if the connector does not exist; false otherwise
*/
- protected boolean checkConnectorAndTasksAreNotRunning(String
connectorName) {
- ConnectorStateInfo info;
+ protected boolean checkConnectorDoesNotExist(String connectorName) {
try {
- info = connect.connectorStatus(connectorName);
+ connect.connectorStatus(connectorName);
} catch (ConnectRestException e) {
return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
} catch (Exception e) {
log.error("Could not check connector state info.", e);
return false;
}
- if (info == null) {
- return true;
- }
- return
!info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
- && info.tasks().stream().noneMatch(s ->
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+ return false;
}
-
/**
* Assert that a connector is in the stopped state and has no tasks.
*