This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new b9ff91b MINOR: Small Connect integration test fixes (#8100) b9ff91b is described below commit b9ff91b3edaea46685723605cf6a1067b57cd300 Author: Konstantine Karantasis <konstant...@confluent.io> AuthorDate: Wed Feb 12 15:40:37 2020 -0800 MINOR: Small Connect integration test fixes (#8100) Author: Konstantine Karantasis <konstant...@confluent.io> Reviewer: Randall Hauch <rha...@gmail.com> --- .../connect/integration/ConnectWorkerIntegrationTest.java | 3 +-- .../connect/integration/ErrorHandlingIntegrationTest.java | 11 +++++++++-- .../connect/integration/ExampleConnectIntegrationTest.java | 3 +-- .../connect/integration/RestExtensionIntegrationTest.java | 11 ++++++++--- .../kafka/connect/util/clusters/EmbeddedConnectCluster.java | 8 +++++--- 5 files changed, 24 insertions(+), 12 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index ad0bcbc..cbdaf74 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -60,7 +59,7 @@ public class ConnectWorkerIntegrationTest { Properties brokerProps = new Properties(); @Before - public void setup() throws IOException { + public void setup() { // setup Connect worker properties workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 33e6cf5..8963b8c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -69,6 +68,7 @@ public class ErrorHandlingIntegrationTest { private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class); + private static final int NUM_WORKERS = 1; private static final String DLQ_TOPIC = "my-connector-errors"; private static final String CONNECTOR_NAME = "error-conn"; private static final String TASK_ID = "error-conn-0"; @@ -83,12 +83,14 @@ public class ErrorHandlingIntegrationTest { private ConnectorHandle connectorHandle; @Before - public void setup() throws IOException { + public void setup() throws InterruptedException { // setup Connect cluster with defaults connect = new EmbeddedConnectCluster.Builder().build(); // start Connect cluster connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); // get connector handles before starting test. connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); @@ -134,6 +136,8 @@ public class ErrorHandlingIntegrationTest { connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS); connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); waitForCondition(this::checkForPartitionAssignment, CONNECTOR_SETUP_DURATION_MS, @@ -172,6 +176,9 @@ public class ErrorHandlingIntegrationTest { } connect.deleteConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME, + "Connector tasks did not stop in time."); + } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index bd06291..14808c7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -66,7 +65,7 @@ public class ExampleConnectIntegrationTest { private ConnectorHandle connectorHandle; @Before - public void setup() throws IOException { + public void setup() { // setup Connect worker properties Map<String, String> exampleWorkerProps = new HashMap<>(); exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index 3a6bc3a..a50019f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.core.Response; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -57,11 +56,12 @@ public class RestExtensionIntegrationTest { private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + private static final int NUM_WORKERS = 1; private EmbeddedConnectCluster connect; @Test - public void testRestExtensionApi() throws IOException, InterruptedException { + public void testRestExtensionApi() throws InterruptedException { // setup Connect worker properties Map<String, String> workerProps = new HashMap<>(); workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName()); @@ -69,7 +69,7 @@ public class RestExtensionIntegrationTest { // build a Connect cluster backed by Kafka and Zk connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") - .numWorkers(1) + .numWorkers(NUM_WORKERS) .numBrokers(1) .workerProps(workerProps) .build(); @@ -77,6 +77,9 @@ public class RestExtensionIntegrationTest { // start the clusters connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + WorkerHandle worker = connect.workers().stream() .findFirst() .orElseThrow(() -> new AssertionError("At least one worker handle should be available")); @@ -99,6 +102,8 @@ public class RestExtensionIntegrationTest { connectorHandle.taskHandle(connectorHandle.name() + "-0"); StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1); connect.configureConnector(connectorHandle.name(), connectorProps); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(), 1, + "Connector tasks did not start in time."); connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS); String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 8e2736d..89c97e6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -284,7 +284,8 @@ public class EmbeddedConnectCluster { if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { return responseToString(response); } - throw new ConnectRestException(response.getStatus(), "Could not execute PUT request"); + throw new ConnectRestException(response.getStatus(), + "Could not execute PUT request. Error response: " + responseToString(response)); } /** @@ -298,7 +299,8 @@ public class EmbeddedConnectCluster { String url = endpointForResource(String.format("connectors/%s", connName)); Response response = requestDelete(url); if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { - throw new ConnectRestException(response.getStatus(), "Could not execute DELETE request."); + throw new ConnectRestException(response.getStatus(), + "Could not execute DELETE request. Error response: " + responseToString(response)); } } @@ -358,7 +360,7 @@ public class EmbeddedConnectCluster { * * @param resource the resource under the worker's admin endpoint * @return the admin endpoint URL - * @throws ConnectRestException if no REST endpoint is available + * @throws ConnectException if no REST endpoint is available */ public String endpointForResource(String resource) { String url = connectCluster.stream()