This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new b9ff91b  MINOR: Small Connect integration test fixes (#8100)
b9ff91b is described below

commit b9ff91b3edaea46685723605cf6a1067b57cd300
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Wed Feb 12 15:40:37 2020 -0800

    MINOR: Small Connect integration test fixes (#8100)
    
    Author: Konstantine Karantasis <konstant...@confluent.io>
    Reviewer: Randall Hauch <rha...@gmail.com>
---
 .../connect/integration/ConnectWorkerIntegrationTest.java     |  3 +--
 .../connect/integration/ErrorHandlingIntegrationTest.java     | 11 +++++++++--
 .../connect/integration/ExampleConnectIntegrationTest.java    |  3 +--
 .../connect/integration/RestExtensionIntegrationTest.java     | 11 ++++++++---
 .../kafka/connect/util/clusters/EmbeddedConnectCluster.java   |  8 +++++---
 5 files changed, 24 insertions(+), 12 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index ad0bcbc..cbdaf74 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -60,7 +59,7 @@ public class ConnectWorkerIntegrationTest {
     Properties brokerProps = new Properties();
 
     @Before
-    public void setup() throws IOException {
+    public void setup() {
         // setup Connect worker properties
         workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 33e6cf5..8963b8c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -69,6 +68,7 @@ public class ErrorHandlingIntegrationTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
 
+    private static final int NUM_WORKERS = 1;
     private static final String DLQ_TOPIC = "my-connector-errors";
     private static final String CONNECTOR_NAME = "error-conn";
     private static final String TASK_ID = "error-conn-0";
@@ -83,12 +83,14 @@ public class ErrorHandlingIntegrationTest {
     private ConnectorHandle connectorHandle;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() throws InterruptedException {
         // setup Connect cluster with defaults
         connect = new EmbeddedConnectCluster.Builder().build();
 
         // start Connect cluster
         connect.start();
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
 
         // get connector handles before starting test.
         connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
@@ -134,6 +136,8 @@ public class ErrorHandlingIntegrationTest {
         
connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
 
         connect.configureConnector(CONNECTOR_NAME, props);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         waitForCondition(this::checkForPartitionAssignment,
                 CONNECTOR_SETUP_DURATION_MS,
@@ -172,6 +176,9 @@ public class ErrorHandlingIntegrationTest {
         }
 
         connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+                "Connector tasks did not stop in time.");
+
     }
 
     /**
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index bd06291..14808c7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -66,7 +65,7 @@ public class ExampleConnectIntegrationTest {
     private ConnectorHandle connectorHandle;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() {
         // setup Connect worker properties
         Map<String, String> exampleWorkerProps = new HashMap<>();
         exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index 3a6bc3a..a50019f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -57,11 +56,12 @@ public class RestExtensionIntegrationTest {
 
     private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = 
TimeUnit.MINUTES.toMillis(1);
     private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = 
TimeUnit.MINUTES.toMillis(1);
+    private static final int NUM_WORKERS = 1;
 
     private EmbeddedConnectCluster connect;
 
     @Test
-    public void testRestExtensionApi() throws IOException, 
InterruptedException {
+    public void testRestExtensionApi() throws InterruptedException {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(REST_EXTENSION_CLASSES_CONFIG, 
IntegrationTestRestExtension.class.getName());
@@ -69,7 +69,7 @@ public class RestExtensionIntegrationTest {
         // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
             .name("connect-cluster")
-            .numWorkers(1)
+            .numWorkers(NUM_WORKERS)
             .numBrokers(1)
             .workerProps(workerProps)
             .build();
@@ -77,6 +77,9 @@ public class RestExtensionIntegrationTest {
         // start the clusters
         connect.start();
 
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
         WorkerHandle worker = connect.workers().stream()
             .findFirst()
             .orElseThrow(() -> new AssertionError("At least one worker handle 
should be available"));
@@ -99,6 +102,8 @@ public class RestExtensionIntegrationTest {
             connectorHandle.taskHandle(connectorHandle.name() + "-0");
             StartAndStopLatch connectorStartLatch = 
connectorHandle.expectedStarts(1);
             connect.configureConnector(connectorHandle.name(), connectorProps);
+            
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(),
 1,
+                    "Connector tasks did not start in time.");
             connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
 
             String workerId = String.format("%s:%d", worker.url().getHost(), 
worker.url().getPort());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 8e2736d..89c97e6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -284,7 +284,8 @@ public class EmbeddedConnectCluster {
         if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
             return responseToString(response);
         }
-        throw new ConnectRestException(response.getStatus(), "Could not 
execute PUT request");
+        throw new ConnectRestException(response.getStatus(),
+                "Could not execute PUT request. Error response: " + 
responseToString(response));
     }
 
     /**
@@ -298,7 +299,8 @@ public class EmbeddedConnectCluster {
         String url = endpointForResource(String.format("connectors/%s", 
connName));
         Response response = requestDelete(url);
         if (response.getStatus() >= 
Response.Status.BAD_REQUEST.getStatusCode()) {
-            throw new ConnectRestException(response.getStatus(), "Could not 
execute DELETE request.");
+            throw new ConnectRestException(response.getStatus(),
+                    "Could not execute DELETE request. Error response: " + 
responseToString(response));
         }
     }
 
@@ -358,7 +360,7 @@ public class EmbeddedConnectCluster {
      *
      * @param resource the resource under the worker's admin endpoint
      * @return the admin endpoint URL
-     * @throws ConnectRestException if no REST endpoint is available
+     * @throws ConnectException if no REST endpoint is available
      */
     public String endpointForResource(String resource) {
         String url = connectCluster.stream()

Reply via email to