This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4cad35  KAFKA-8014: Extend Connect integration tests to add and 
remove workers dynamically (#6342)
e4cad35 is described below

commit e4cad353124478fd4034f18d2292ba62fb702924
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Mon Mar 25 07:29:33 2019 -0700

    KAFKA-8014: Extend Connect integration tests to add and remove workers 
dynamically (#6342)
    
    Extend Connect's integration test framework to add or remove workers to 
EmbeddedConnectCluster, and choosing whether to fail the test on ungraceful 
service shutdown. Also added more JavaDoc and other minor improvements.
    
    Author: Konstantine Karantasis <konstant...@confluent.io>
    
    Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch 
<rha...@gmail.com>
    
    Closes #6342 from kkonstantine/KAFKA-8014
---
 checkstyle/import-control.xml                      |   1 +
 .../connect/runtime/rest/entities/ServerInfo.java  |  14 +-
 .../integration/ConnectWorkerIntegrationTest.java  | 177 +++++++++++++++++
 .../kafka/connect/integration/ConnectorHandle.java | 106 ++++++++--
 .../integration/ErrorHandlingIntegrationTest.java  |   2 +-
 .../integration/ExampleConnectIntegrationTest.java |  69 ++++++-
 .../integration/MonitorableSinkConnector.java      |  44 ++++-
 .../integration/MonitorableSourceConnector.java    | 160 +++++++++++++++
 .../kafka/connect/integration/TaskHandle.java      |  98 ++++++++--
 .../util/clusters/EmbeddedConnectCluster.java      | 214 ++++++++++++++++++---
 .../clusters/UngracefulShutdownException.java}     |  37 ++--
 .../kafka/connect/util/clusters/WorkerHandle.java  | 103 ++++++++++
 12 files changed, 938 insertions(+), 87 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ffc9bf9..f751aad 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -381,6 +381,7 @@
     <subpackage name="integration">
       <allow pkg="org.apache.kafka.connect.util.clusters" />
       <allow pkg="org.apache.kafka.connect" />
+      <allow pkg="org.apache.kafka.tools" />
     </subpackage>
 
     <subpackage name="json">
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
index a12751c..e5c5553 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.entities;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kafka.common.utils.AppInfoParser;
 
@@ -24,12 +25,19 @@ public class ServerInfo {
     private final String commit;
     private final String kafkaClusterId;
 
-    public ServerInfo(String kafkaClusterId) {
-        this.version = AppInfoParser.getVersion();
-        this.commit = AppInfoParser.getCommitId();
+    @JsonCreator
+    private ServerInfo(@JsonProperty("version") String version,
+                       @JsonProperty("commit") String commit,
+                       @JsonProperty("kafka_cluster_id") String 
kafkaClusterId) {
+        this.version = version;
+        this.commit = commit;
         this.kafkaClusterId = kafkaClusterId;
     }
 
+    public ServerInfo(String kafkaClusterId) {
+        this(AppInfoParser.getVersion(), AppInfoParser.getCommitId(), 
kafkaClusterId);
+    }
+
     @JsonProperty
     public String version() {
         return version;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
new file mode 100644
index 0000000..09363cd
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.WorkerHandle;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test simple operations on the workers of a Connect cluster.
+ */
+@Category(IntegrationTest.class)
+public class ConnectWorkerIntegrationTest {
+    private static final Logger log = 
LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
+
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final int CONNECTOR_SETUP_DURATION_MS = 15_000;
+    private static final int WORKER_SETUP_DURATION_MS = 20_000;
+    private static final int OFFSET_COMMIT_INTERVAL_MS = 30_000;
+    private static final int NUM_WORKERS = 3;
+    private static final String CONNECTOR_NAME = "simple-source";
+
+    private EmbeddedConnectCluster connect;
+
+    @Before
+    public void setup() throws IOException {
+        // setup Connect worker properties
+        Map<String, String> exampleWorkerProps = new HashMap<>();
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", 
String.valueOf(false));
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .workerProps(exampleWorkerProps)
+                .brokerProps(exampleBrokerProps)
+                .maskExitProcedures(true) // true is the default, setting here 
as example
+                .build();
+
+        // start the clusters
+        connect.start();
+    }
+
+    @After
+    public void close() {
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    /**
+     * Simple test case to add and then remove a worker from the embedded 
Connect cluster while
+     * running a simple source connector.
+     */
+    @Test
+    public void testAddAndRemoveWorker() throws Exception {
+        int numTasks = 4;
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+
+        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
+                WORKER_SETUP_DURATION_MS, "Initial group of workers did not 
start in time.");
+
+        // start a source connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, 
numTasks).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in 
time.");
+
+        WorkerHandle extraWorker = connect.addWorker();
+
+        waitForCondition(() -> assertWorkersUp(NUM_WORKERS + 1).orElse(false),
+                WORKER_SETUP_DURATION_MS, "Expanded group of workers did not 
start in time.");
+
+        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, 
numTasks).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in 
running state.");
+
+        Set<WorkerHandle> workers = connect.activeWorkers();
+        assertTrue(workers.contains(extraWorker));
+
+        connect.removeWorker(extraWorker);
+
+        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false) && 
!assertWorkersUp(NUM_WORKERS + 1).orElse(false),
+                WORKER_SETUP_DURATION_MS, "Group of workers did not shrink in 
time.");
+
+        workers = connect.activeWorkers();
+        assertFalse(workers.contains(extraWorker));
+    }
+
+    /**
+     * Confirm that the requested number of workers is up and running.
+     *
+     * @param numWorkers the number of online workers
+     * @return true if at least {@code numWorkers} are up; false otherwise
+     */
+    private Optional<Boolean> assertWorkersUp(int numWorkers) {
+        try {
+            int numUp = connect.activeWorkers().size();
+            return Optional.of(numUp >= numWorkers);
+        } catch (Exception e) {
+            log.error("Could not check active workers.", e);
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Confirm that a connector with an exact number of tasks is running.
+     *
+     * @param connectorName the connector
+     * @param numTasks the expected number of tasks
+     * @return true if the connector and tasks are in RUNNING state; false 
otherwise
+     */
+    private Optional<Boolean> assertConnectorAndTasksRunning(String 
connectorName, int numTasks) {
+        try {
+            ConnectorStateInfo info = connect.connectorStatus(connectorName);
+            boolean result = info != null
+                    && info.tasks().size() == numTasks
+                    && 
info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    && info.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+            return Optional.of(result);
+        } catch (Exception e) {
+            log.error("Could not check connector state info.", e);
+            return Optional.empty();
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
index e59691b..0df0f8c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 /**
  * A handle to a connector executing in a Connect cluster.
@@ -37,7 +38,9 @@ public class ConnectorHandle {
     private final Map<String, TaskHandle> taskHandles = new 
ConcurrentHashMap<>();
 
     private CountDownLatch recordsRemainingLatch;
+    private CountDownLatch recordsToCommitLatch;
     private int expectedRecords = -1;
+    private int expectedCommits = -1;
 
     public ConnectorHandle(String connectorName) {
         this.connectorName = connectorName;
@@ -54,6 +57,20 @@ public class ConnectorHandle {
         return taskHandles.computeIfAbsent(taskId, k -> new TaskHandle(this, 
taskId));
     }
 
+    /**
+     * Get the connector's name corresponding to this handle.
+     *
+     * @return the connector's name
+     */
+    public String name() {
+        return connectorName;
+    }
+
+    /**
+     * Get the list of tasks handles monitored by this connector handle.
+     *
+     * @return the task handle list
+     */
     public Collection<TaskHandle> tasks() {
         return taskHandles.values();
     }
@@ -69,13 +86,23 @@ public class ConnectorHandle {
     }
 
     /**
-     * Set the number of expected records for this task.
+     * Set the number of expected records for this connector.
+     *
+     * @param expected number of records
+     */
+    public void expectedRecords(int expected) {
+        expectedRecords = expected;
+        recordsRemainingLatch = new CountDownLatch(expected);
+    }
+
+    /**
+     * Set the number of expected commits performed by this connector.
      *
-     * @param expectedRecords number of records
+     * @param expected number of commits
      */
-    public void expectedRecords(int expectedRecords) {
-        this.expectedRecords = expectedRecords;
-        this.recordsRemainingLatch = new CountDownLatch(expectedRecords);
+    public void expectedCommits(int expected) {
+        expectedCommits = expected;
+        recordsToCommitLatch = new CountDownLatch(expected);
     }
 
     /**
@@ -88,25 +115,80 @@ public class ConnectorHandle {
     }
 
     /**
-     * Wait for this task to receive the expected number of records.
+     * Record arrival of a batch of messages at the connector.
      *
-     * @param consumeMaxDurationMs max duration to wait for records
+     * @param batchSize the number of messages
+     */
+    public void record(int batchSize) {
+        if (recordsRemainingLatch != null) {
+            IntStream.range(0, batchSize).forEach(i -> 
recordsRemainingLatch.countDown());
+        }
+    }
+
+    /**
+     * Record a message commit from the connector.
+     */
+    public void commit() {
+        if (recordsToCommitLatch != null) {
+            recordsToCommitLatch.countDown();
+        }
+    }
+
+    /**
+     * Record commit on a batch of messages from the connector.
+     *
+     * @param batchSize the number of messages
+     */
+    public void commit(int batchSize) {
+        if (recordsToCommitLatch != null) {
+            IntStream.range(0, batchSize).forEach(i -> 
recordsToCommitLatch.countDown());
+        }
+    }
+
+    /**
+     * Wait for this connector to meet the expected number of records as 
defined by {@code
+     * expectedRecords}.
+     *
+     * @param timeout max duration to wait for records
      * @throws InterruptedException if another threads interrupts this one 
while waiting for records
      */
-    public void awaitRecords(int consumeMaxDurationMs) throws 
InterruptedException {
+    public void awaitRecords(int timeout) throws InterruptedException {
         if (recordsRemainingLatch == null || expectedRecords < 0) {
-            throw new IllegalStateException("expectedRecords() was not set for 
this task?");
+            throw new IllegalStateException("expectedRecords() was not set for 
this connector?");
         }
-        if (!recordsRemainingLatch.await(consumeMaxDurationMs, 
TimeUnit.MILLISECONDS)) {
-            String msg = String.format("Insufficient records seen by connector 
%s in %d millis. Records expected=%d, actual=%d",
+        if (!recordsRemainingLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+            String msg = String.format(
+                    "Insufficient records seen by connector %s in %d millis. 
Records expected=%d, actual=%d",
                     connectorName,
-                    consumeMaxDurationMs,
+                    timeout,
                     expectedRecords,
                     expectedRecords - recordsRemainingLatch.getCount());
             throw new DataException(msg);
         }
     }
 
+     /**
+     * Wait for this connector to meet the expected number of commits as 
defined by {@code
+     * expectedCommits}.
+     *
+     * @param  timeout duration to wait for commits
+     * @throws InterruptedException if another threads interrupts this one 
while waiting for commits
+     */
+    public void awaitCommits(int timeout) throws InterruptedException {
+        if (recordsToCommitLatch == null || expectedCommits < 0) {
+            throw new IllegalStateException("expectedCommits() was not set for 
this connector?");
+        }
+        if (!recordsToCommitLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+            String msg = String.format(
+                    "Insufficient records committed by connector %s in %d 
millis. Records expected=%d, actual=%d",
+                    connectorName,
+                    timeout,
+                    expectedCommits,
+                    expectedCommits - recordsToCommitLatch.getCount());
+            throw new DataException(msg);
+        }
+    }
+
     @Override
     public String toString() {
         return "ConnectorHandle{" +
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 5f7cfc9..67bcc74 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -106,7 +106,7 @@ public class ErrorHandlingIntegrationTest {
 
         // setup connector config
         Map<String, String> props = new HashMap<>();
-        props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSinkConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put(TOPICS_CONFIG, "test-topic");
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 0648e9f..224d6ac 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -40,6 +40,7 @@ import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * An example integration test that demonstrates how to setup an integration 
test for Connect.
@@ -54,9 +55,10 @@ public class ExampleConnectIntegrationTest {
 
     private static final int NUM_RECORDS_PRODUCED = 2000;
     private static final int NUM_TOPIC_PARTITIONS = 3;
-    private static final int CONSUME_MAX_DURATION_MS = 5000;
+    private static final int RECORD_TRANSFER_DURATION_MS = 5000;
     private static final int CONNECTOR_SETUP_DURATION_MS = 15000;
     private static final int NUM_TASKS = 3;
+    private static final int NUM_WORKERS = 3;
     private static final String CONNECTOR_NAME = "simple-conn";
 
     private EmbeddedConnectCluster connect;
@@ -66,16 +68,16 @@ public class ExampleConnectIntegrationTest {
     public void setup() throws IOException {
         // setup Connect worker properties
         Map<String, String> exampleWorkerProps = new HashMap<>();
-        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000");
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
 
         // setup Kafka broker properties
         Properties exampleBrokerProps = new Properties();
         exampleBrokerProps.put("auto.create.topics.enable", "false");
 
-        // build a Connect cluster backed by Kakfa and Zk
+        // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
-                .name("example-cluster")
-                .numWorkers(3)
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
                 .numBrokers(1)
                 .workerProps(exampleWorkerProps)
                 .brokerProps(exampleBrokerProps)
@@ -93,7 +95,7 @@ public class ExampleConnectIntegrationTest {
         // delete connector handle
         RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
 
-        // stop all Connect, Kakfa and Zk threads.
+        // stop all Connect, Kafka and Zk threads.
         connect.stop();
     }
 
@@ -102,13 +104,13 @@ public class ExampleConnectIntegrationTest {
      * records, and start up a sink connector which will consume these records.
      */
     @Test
-    public void testProduceConsumeConnector() throws Exception {
+    public void testSinkConnector() throws Exception {
         // create test topic
         connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
 
         // setup up props for the sink connector
         Map<String, String> props = new HashMap<>();
-        props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSinkConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put(TOPICS_CONFIG, "test-topic");
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
@@ -117,6 +119,9 @@ public class ExampleConnectIntegrationTest {
         // expect all records to be consumed by the connector
         connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
 
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
         // start a sink connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
@@ -131,10 +136,54 @@ public class ExampleConnectIntegrationTest {
 
         // consume all records from the source topic or fail, to ensure that 
they were correctly produced.
         assertEquals("Unexpected number of records consumed", 
NUM_RECORDS_PRODUCED,
-                connect.kafka().consume(NUM_RECORDS_PRODUCED, 
CONSUME_MAX_DURATION_MS, "test-topic").count());
+                connect.kafka().consume(NUM_RECORDS_PRODUCED, 
RECORD_TRANSFER_DURATION_MS, "test-topic").count());
 
         // wait for the connector tasks to consume all records.
-        connectorHandle.awaitRecords(CONSUME_MAX_DURATION_MS);
+        connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
+
+        // wait for the connector tasks to commit all records.
+        connectorHandle.awaitCommits(CONNECTOR_SETUP_DURATION_MS);
+
+        // delete connector
+        connect.deleteConnector(CONNECTOR_NAME);
+    }
+
+    /**
+     * Simple test case to configure and execute an embedded Connect cluster. 
The test will produce and consume
+     * records, and start up a sink connector which will consume these records.
+     */
+    @Test
+    public void testSourceConnector() throws Exception {
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("topic", "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
+
+        // start a source connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        // wait for the connector tasks to produce enough records
+        connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
+
+        // wait for the connector tasks to commit enough records
+        connectorHandle.awaitCommits(CONNECTOR_SETUP_DURATION_MS);
+
+        // consume all records from the source topic or fail, to ensure that 
they were correctly produced
+        int recordNum = connect.kafka().consume(NUM_RECORDS_PRODUCED, 
RECORD_TRANSFER_DURATION_MS, "test-topic").count();
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + NUM_RECORDS_PRODUCED + " + but got " + recordNum,
+                recordNum >= NUM_RECORDS_PRODUCED);
 
         // delete connector
         connect.deleteConnector(CONNECTOR_NAME);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
index 23a8d99..06145de 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.integration;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
@@ -28,11 +29,15 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
- * A connector to be used in integration tests. This class provides methods to 
find task instances
+ * A sink connector that is used in Apache Kafka integration tests to verify 
the behavior of the
+ * Connect framework, but that can be used in other integration tests as a 
simple connector that
+ * consumes and counts records. This class provides methods to find task 
instances
  * which are initiated by the embedded connector, and wait for them to consume 
a desired number of
  * messages.
  */
@@ -41,10 +46,12 @@ public class MonitorableSinkConnector extends 
TestSinkConnector {
     private static final Logger log = 
LoggerFactory.getLogger(MonitorableSinkConnector.class);
 
     private String connectorName;
+    private Map<String, String> commonConfigs;
 
     @Override
     public void start(Map<String, String> props) {
         connectorName = props.get("name");
+        commonConfigs = props;
         log.info("Starting connector {}", props.get("name"));
     }
 
@@ -57,7 +64,7 @@ public class MonitorableSinkConnector extends 
TestSinkConnector {
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         List<Map<String, String>> configs = new ArrayList<>();
         for (int i = 0; i < maxTasks; i++) {
-            Map<String, String> config = new HashMap<>();
+            Map<String, String> config = new HashMap<>(commonConfigs);
             config.put("connector.name", connectorName);
             config.put("task.id", connectorName + "-" + i);
             configs.add(config);
@@ -79,6 +86,15 @@ public class MonitorableSinkConnector extends 
TestSinkConnector {
         private String connectorName;
         private String taskId;
         private TaskHandle taskHandle;
+        private Set<TopicPartition> assignments;
+        private Map<TopicPartition, Long> committedOffsets;
+        private Map<String, Map<Integer, TopicPartition>> 
cachedTopicPartitions;
+
+        public MonitorableSinkTask() {
+            this.assignments = new HashSet<>();
+            this.committedOffsets = new HashMap<>();
+            this.cachedTopicPartitions = new HashMap<>();
+        }
 
         @Override
         public String version() {
@@ -96,7 +112,7 @@ public class MonitorableSinkConnector extends 
TestSinkConnector {
         @Override
         public void open(Collection<TopicPartition> partitions) {
             log.debug("Opening {} partitions", partitions.size());
-            super.open(partitions);
+            assignments.addAll(partitions);
             taskHandle.partitionsAssigned(partitions.size());
         }
 
@@ -104,11 +120,33 @@ public class MonitorableSinkConnector extends 
TestSinkConnector {
         public void put(Collection<SinkRecord> records) {
             for (SinkRecord rec : records) {
                 taskHandle.record();
+                TopicPartition tp = cachedTopicPartitions
+                        .computeIfAbsent(rec.topic(), v -> new HashMap<>())
+                        .computeIfAbsent(rec.kafkaPartition(), v -> new 
TopicPartition(rec.topic(), rec.kafkaPartition()));
+                committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0L) 
+ 1);
                 log.trace("Task {} obtained record (key='{}' value='{}')", 
taskId, rec.key(), rec.value());
             }
         }
 
         @Override
+        public Map<TopicPartition, OffsetAndMetadata> 
preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+            for (TopicPartition tp : assignments) {
+                Long recordsSinceLastCommit = committedOffsets.get(tp);
+                if (recordsSinceLastCommit == null) {
+                    log.warn("preCommit was called with topic-partition {} 
that is not included "
+                            + "in the assignments of this task {}", tp, 
assignments);
+                } else {
+                    taskHandle.commit(recordsSinceLastCommit.intValue());
+                    log.error("Forwarding to framework request to commit 
additional {} for {}",
+                            recordsSinceLastCommit, tp);
+                    taskHandle.commit((int) (long) recordsSinceLastCommit);
+                    committedOffsets.put(tp, 0L);
+                }
+            }
+            return offsets;
+        }
+
+        @Override
         public void stop() {
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
new file mode 100644
index 0000000..8bc8953
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.runtime.TestSourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.tools.ThroughputThrottler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/**
+ * A source connector that is used in Apache Kafka integration tests to verify 
the behavior of
+ * the Connect framework, but that can be used in other integration tests as a 
simple connector
+ * that generates records of a fixed structure. The rate of record production 
can be adjusted
+ * through the configs 'throughput' and 'messages.per.poll'
+ */
+public class MonitorableSourceConnector extends TestSourceConnector {
+    private static final Logger log = 
LoggerFactory.getLogger(MonitorableSourceConnector.class);
+
+    private String connectorName;
+    private ConnectorHandle connectorHandle;
+    private Map<String, String> commonConfigs;
+
+    @Override
+    public void start(Map<String, String> props) {
+        connectorHandle = 
RuntimeHandles.get().connectorHandle(props.get("name"));
+        connectorName = connectorHandle.name();
+        commonConfigs = props;
+        log.info("Started {} connector {}", this.getClass().getSimpleName(), 
connectorName);
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MonitorableSourceTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            Map<String, String> config = new HashMap<>(commonConfigs);
+            config.put("connector.name", connectorName);
+            config.put("task.id", connectorName + "-" + i);
+            configs.add(config);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+        log.info("Stopped {} connector {}", this.getClass().getSimpleName(), 
connectorName);
+    }
+
+    @Override
+    public ConfigDef config() {
+        log.info("Configured {} connector {}", 
this.getClass().getSimpleName(), connectorName);
+        return new ConfigDef();
+    }
+
+    public static class MonitorableSourceTask extends SourceTask {
+        private String connectorName;
+        private String taskId;
+        private String topicName;
+        private TaskHandle taskHandle;
+        private volatile boolean stopped;
+        private long startingSeqno;
+        private long seqno;
+        private long throughput;
+        private int batchSize;
+        private ThroughputThrottler throttler;
+
+        @Override
+        public String version() {
+            return "unknown";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            taskId = props.get("task.id");
+            connectorName = props.get("connector.name");
+            topicName = props.getOrDefault("topic", "sequential-topic");
+            throughput = Long.valueOf(props.getOrDefault("throughput", "-1"));
+            batchSize = 
Integer.valueOf(props.getOrDefault("messages.per.poll", "1"));
+            taskHandle = 
RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
+            Map<String, Object> offset = Optional.ofNullable(
+                    
context.offsetStorageReader().offset(Collections.singletonMap("task.id", 
taskId)))
+                    .orElse(Collections.emptyMap());
+            startingSeqno = Optional.ofNullable((Long) 
offset.get("saved")).orElse(0L);
+            log.info("Started {} task {}", this.getClass().getSimpleName(), 
taskId);
+            throttler = new ThroughputThrottler(throughput, 
System.currentTimeMillis());
+        }
+
+        @Override
+        public List<SourceRecord> poll() {
+            if (!stopped) {
+                if (throttler.shouldThrottle(seqno - startingSeqno, 
System.currentTimeMillis())) {
+                    throttler.throttle();
+                }
+                taskHandle.record(batchSize);
+                return LongStream.range(0, batchSize)
+                        .mapToObj(i -> new SourceRecord(
+                                Collections.singletonMap("task.id", taskId),
+                                Collections.singletonMap("saved", ++seqno),
+                                topicName,
+                                null,
+                                Schema.STRING_SCHEMA,
+                                "key-" + taskId + "-" + seqno,
+                                Schema.STRING_SCHEMA,
+                                "value-" + taskId + "-" + seqno))
+                        .collect(Collectors.toList());
+            }
+            return null;
+        }
+
+        @Override
+        public void commit() {
+            log.info("Task {} committing offsets", taskId);
+            //TODO: save progress outside the offset topic, potentially in the 
task handle
+        }
+
+        @Override
+        public void commitRecord(SourceRecord record) {
+            log.trace("Committing record: {}", record);
+            taskHandle.commit();
+        }
+
+        @Override
+        public void stop() {
+            log.info("Stopped {} task {}", this.getClass().getSimpleName(), 
taskId);
+            stopped = true;
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
index de3d924..6081ea3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
 /**
  * A handle to an executing task in a worker. Use this class to record 
progress, for example: number of records seen
@@ -37,7 +38,9 @@ public class TaskHandle {
     private final AtomicInteger partitionsAssigned = new AtomicInteger(0);
 
     private CountDownLatch recordsRemainingLatch;
+    private CountDownLatch recordsToCommitLatch;
     private int expectedRecords = -1;
+    private int expectedCommits = -1;
 
     public TaskHandle(ConnectorHandle connectorHandle, String taskId) {
         log.info("Created task {} for connector {}", taskId, connectorHandle);
@@ -46,7 +49,7 @@ public class TaskHandle {
     }
 
     /**
-     * Record a message arrival at the task.
+     * Record a message arrival at the task and the connector overall.
      */
     public void record() {
         if (recordsRemainingLatch != null) {
@@ -56,13 +59,57 @@ public class TaskHandle {
     }
 
     /**
+     * Record arrival of a batch of messages at the task and the connector 
overall.
+     *
+     * @param batchSize the number of messages
+     */
+    public void record(int batchSize) {
+        if (recordsRemainingLatch != null) {
+            IntStream.range(0, batchSize).forEach(i -> 
recordsRemainingLatch.countDown());
+        }
+        connectorHandle.record(batchSize);
+    }
+
+    /**
+     * Record a message commit from the task and the connector overall.
+     */
+    public void commit() {
+        if (recordsToCommitLatch != null) {
+            recordsToCommitLatch.countDown();
+        }
+        connectorHandle.commit();
+    }
+
+    /**
+     * Record commit on a batch of messages from the task and the connector 
overall.
+     *
+     * @param batchSize the number of messages
+     */
+    public void commit(int batchSize) {
+        if (recordsToCommitLatch != null) {
+            IntStream.range(0, batchSize).forEach(i -> 
recordsToCommitLatch.countDown());
+        }
+        connectorHandle.commit(batchSize);
+    }
+
+    /**
      * Set the number of expected records for this task.
      *
-     * @param expectedRecords number of records
+     * @param expected number of records
+     */
+    public void expectedRecords(int expected) {
+        expectedRecords = expected;
+        recordsRemainingLatch = new CountDownLatch(expected);
+    }
+
+    /**
+     * Set the number of expected record commits performed by this task.
+     *
+     * @param expected number of commits
      */
-    public void expectedRecords(int expectedRecords) {
-        this.expectedRecords = expectedRecords;
-        this.recordsRemainingLatch = new CountDownLatch(expectedRecords);
+    public void expectedCommits(int expected) {
+        expectedRecords = expected;
+        recordsToCommitLatch = new CountDownLatch(expected);
     }
 
     /**
@@ -82,24 +129,51 @@ public class TaskHandle {
     }
 
     /**
-     * Wait for this task to receive the expected number of records.
+     * Wait for this task to meet the expected number of records as defined by 
{@code
+     * expectedRecords}.
      *
-     * @param consumeMaxDurationMs max duration to wait for records
+     * @param  timeout duration to wait for records
      * @throws InterruptedException if another threads interrupts this one 
while waiting for records
      */
-    public void awaitRecords(int consumeMaxDurationMs) throws 
InterruptedException {
+    public void awaitRecords(int timeout) throws InterruptedException {
         if (recordsRemainingLatch == null) {
             throw new IllegalStateException("Illegal state encountered. 
expectedRecords() was not set for this task?");
         }
-        if (!recordsRemainingLatch.await(consumeMaxDurationMs, 
TimeUnit.MILLISECONDS)) {
-            String msg = String.format("Insufficient records seen by task %s 
in %d millis. Records expected=%d, actual=%d",
+        if (!recordsRemainingLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+            String msg = String.format(
+                    "Insufficient records seen by task %s in %d millis. 
Records expected=%d, actual=%d",
                     taskId,
-                    consumeMaxDurationMs,
+                    timeout,
                     expectedRecords,
                     expectedRecords - recordsRemainingLatch.getCount());
             throw new DataException(msg);
         }
-        log.debug("Task {} saw {} records, expected {} records", taskId, 
expectedRecords - recordsRemainingLatch.getCount(), expectedRecords);
+        log.debug("Task {} saw {} records, expected {} records",
+                  taskId, expectedRecords - recordsRemainingLatch.getCount(), 
expectedRecords);
+    }
+
+    /**
+     * Wait for this task to meet the expected number of commits as defined by 
{@code
+     * expectedCommits}.
+     *
+     * @param  timeout duration to wait for commits
+     * @throws InterruptedException if another threads interrupts this one 
while waiting for commits
+     */
+    public void awaitCommits(int timeout) throws InterruptedException {
+        if (recordsToCommitLatch == null) {
+            throw new IllegalStateException("Illegal state encountered. 
expectedRecords() was not set for this task?");
+        }
+        if (!recordsToCommitLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+            String msg = String.format(
+                    "Insufficient records seen by task %s in %d millis. 
Records expected=%d, actual=%d",
+                    taskId,
+                    timeout,
+                    expectedCommits,
+                    expectedCommits - recordsToCommitLatch.getCount());
+            throw new DataException(msg);
+        }
+        log.debug("Task {} saw {} records, expected {} records",
+                  taskId, expectedCommits - recordsToCommitLatch.getCount(), 
expectedCommits);
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index b660a1d..590649b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.connect.util.clusters;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.connect.cli.ConnectDistributed;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,10 +32,17 @@ import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -52,7 +59,8 @@ import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.STA
 
 /**
  * Start an embedded connect worker. Internally, this class will spin up a 
Kafka and Zk cluster, setup any tmp
- * directories and clean up them on them.
+ * directories and clean up them on them. Methods on the same {@code 
EmbeddedConnectCluster} are
+ * not guaranteed to be thread-safe.
  */
 public class EmbeddedConnectCluster {
 
@@ -63,24 +71,64 @@ public class EmbeddedConnectCluster {
     private static final Properties DEFAULT_BROKER_CONFIG = new Properties();
     private static final String REST_HOST_NAME = "localhost";
 
-    private final Connect[] connectCluster;
+    private static final String DEFAULT_WORKER_NAME_PREFIX = "connect-worker-";
+
+    private final Set<WorkerHandle> connectCluster;
     private final EmbeddedKafkaCluster kafkaCluster;
     private final Map<String, String> workerProps;
     private final String connectClusterName;
     private final int numBrokers;
-
-    private EmbeddedConnectCluster(String name, Map<String, String> 
workerProps, int numWorkers, int numBrokers, Properties brokerProps) {
+    private final int numInitialWorkers;
+    private final boolean maskExitProcedures;
+    private final String workerNamePrefix;
+    private final AtomicInteger nextWorkerId = new AtomicInteger(0);
+
+    private EmbeddedConnectCluster(String name, Map<String, String> 
workerProps, int numWorkers,
+                                   int numBrokers, Properties brokerProps,
+                                   boolean maskExitProcedures) {
         this.workerProps = workerProps;
         this.connectClusterName = name;
         this.numBrokers = numBrokers;
         this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps);
-        this.connectCluster = new Connect[numWorkers];
+        this.connectCluster = new LinkedHashSet<>();
+        this.numInitialWorkers = numWorkers;
+        this.maskExitProcedures = maskExitProcedures;
+        // leaving non-configurable for now
+        this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX;
     }
 
     /**
+     * A more graceful way to handle abnormal exit of services in integration 
tests.
+     */
+    public Exit.Procedure exitProcedure = (code, message) -> {
+        if (code != 0) {
+            String exitMessage = "Abrupt service exit with code " + code + " 
and message " + message;
+            log.warn(exitMessage);
+            throw new UngracefulShutdownException(exitMessage);
+        }
+        Exit.exit(0, message);
+    };
+
+    /**
+     * A more graceful way to handle abnormal halt of services in integration 
tests.
+     */
+    public Exit.Procedure haltProcedure = (code, message) -> {
+        if (code != 0) {
+            String haltMessage = "Abrupt service halt with code " + code + " 
and message " + message;
+            log.warn(haltMessage);
+            throw new UngracefulShutdownException(haltMessage);
+        }
+        Exit.halt(0, message);
+    };
+
+    /**
      * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
      */
     public void start() throws IOException {
+        if (maskExitProcedures) {
+            Exit.setExitProcedure(exitProcedure);
+            Exit.setHaltProcedure(haltProcedure);
+        }
         kafkaCluster.before();
         startConnect();
     }
@@ -90,26 +138,74 @@ public class EmbeddedConnectCluster {
      * Clean up any temp directories created locally.
      */
     public void stop() {
-        for (Connect worker : this.connectCluster) {
-            try {
-                worker.stop();
-            } catch (Exception e) {
-                log.error("Could not stop connect", e);
-                throw new RuntimeException("Could not stop worker", e);
-            }
-        }
-
+        connectCluster.forEach(this::stopWorker);
         try {
             kafkaCluster.after();
+        } catch (UngracefulShutdownException e) {
+            log.warn("Kafka did not shutdown gracefully");
         } catch (Exception e) {
             log.error("Could not stop kafka", e);
             throw new RuntimeException("Could not stop brokers", e);
+        } finally {
+            Exit.resetExitProcedure();
+            Exit.resetHaltProcedure();
+        }
+    }
+
+    /**
+     * Provision and start an additional worker to the Connect cluster.
+     *
+     * @return the worker handle of the worker that was provisioned
+     */
+    public WorkerHandle addWorker() {
+        WorkerHandle worker = WorkerHandle.start(workerNamePrefix + 
nextWorkerId.getAndIncrement(), workerProps);
+        connectCluster.add(worker);
+        log.info("Started worker {}", worker);
+        return worker;
+    }
+
+    /**
+     * Decommission one of the workers from this Connect cluster. Which worker 
is removed is
+     * implementation dependent and selection is not guaranteed to be 
consistent. Use this method
+     * when you don't care which worker stops.
+     *
+     * @see #removeWorker(WorkerHandle)
+     */
+    public void removeWorker() {
+        WorkerHandle toRemove = null;
+        for (Iterator<WorkerHandle> it = connectCluster.iterator(); 
it.hasNext(); toRemove = it.next()) {
+        }
+        removeWorker(toRemove);
+    }
+
+    /**
+     * Decommission a specific worker from this Connect cluster.
+     *
+     * @param worker the handle of the worker to remove from the cluster
+     */
+    public void removeWorker(WorkerHandle worker) {
+        if (connectCluster.isEmpty()) {
+            throw new IllegalStateException("Cannot remove worker. Cluster is 
empty");
+        }
+        stopWorker(worker);
+        connectCluster.remove(worker);
+    }
+
+    private void stopWorker(WorkerHandle worker) {
+        try {
+            log.info("Stopping worker {}", worker);
+            worker.stop();
+        } catch (UngracefulShutdownException e) {
+            log.warn("Worker {} did not shutdown gracefully", worker);
+        } catch (Exception e) {
+            log.error("Could not stop connect", e);
+            throw new RuntimeException("Could not stop worker", e);
         }
     }
 
     @SuppressWarnings("deprecation")
     public void startConnect() {
-        log.info("Starting Connect cluster with {} workers. clusterName {}", 
connectCluster.length, connectClusterName);
+        log.info("Starting Connect cluster '{}' with {} workers", 
connectClusterName, numInitialWorkers);
 
         workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers());
         workerProps.put(REST_HOST_NAME_CONFIG, REST_HOST_NAME);
@@ -126,12 +222,42 @@ public class EmbeddedConnectCluster {
         putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
         putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
 
-        for (int i = 0; i < connectCluster.length; i++) {
-            connectCluster[i] = new 
ConnectDistributed().startConnect(workerProps);
+        for (int i = 0; i < numInitialWorkers; i++) {
+            addWorker();
         }
     }
 
     /**
+     * Get the workers that are up and running.
+     *
+     * @return the list of handles of the online workers
+     */
+    public Set<WorkerHandle> activeWorkers() {
+        ObjectMapper mapper = new ObjectMapper();
+        return connectCluster.stream()
+                .filter(w -> {
+                    try {
+                        mapper.readerFor(ServerInfo.class)
+                                .readValue(executeGet(w.url().toString()));
+                        return true;
+                    } catch (IOException e) {
+                        // Worker failed to respond. Consider it's offline
+                        return false;
+                    }
+                })
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Get the provisioned workers.
+     *
+     * @return the list of handles of the provisioned workers
+     */
+    public Set<WorkerHandle> workers() {
+        return new LinkedHashSet<>(connectCluster);
+    }
+
+    /**
      * Configure a connector. If the connector does not already exist, a new 
one will be created and
      * the given configuration will be applied to it.
      *
@@ -171,6 +297,24 @@ public class EmbeddedConnectCluster {
     }
 
     /**
+     * Get the connector names of the connectors currently running on this 
cluster.
+     *
+     * @return the list of connector names
+     * @throws ConnectRestException if the HTTP request to the REST API failed 
with a valid status code.
+     * @throws ConnectException for any other error.
+     */
+    public Collection<String> connectors() {
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            String url = endpointForResource("connectors");
+            return 
mapper.readerFor(Collection.class).readValue(executeGet(url));
+        } catch (IOException e) {
+            log.error("Could not read connector list", e);
+            throw new ConnectException("Could not read connector list", e);
+        }
+    }
+
+    /**
      * Get the status for a connector running in this cluster.
      *
      * @param connectorName name of the connector
@@ -180,8 +324,8 @@ public class EmbeddedConnectCluster {
      */
     public ConnectorStateInfo connectorStatus(String connectorName) {
         ObjectMapper mapper = new ObjectMapper();
-        String url = endpointForResource(String.format("connectors/%s/status", 
connectorName));
         try {
+            String url = 
endpointForResource(String.format("connectors/%s/status", connectorName));
             return 
mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url));
         } catch (IOException e) {
             log.error("Could not read connector state", e);
@@ -189,8 +333,13 @@ public class EmbeddedConnectCluster {
         }
     }
 
-    private String endpointForResource(String resource) {
-        String url = String.valueOf(connectCluster[0].restUrl());
+    private String endpointForResource(String resource) throws IOException {
+        String url = connectCluster.stream()
+                .map(WorkerHandle::url)
+                .filter(Objects::nonNull)
+                .findFirst()
+                .orElseThrow(() -> new IOException("Connect workers have not 
been provisioned"))
+                .toString();
         return url + resource;
     }
 
@@ -270,6 +419,7 @@ public class EmbeddedConnectCluster {
         private int numWorkers = DEFAULT_NUM_WORKERS;
         private int numBrokers = DEFAULT_NUM_BROKERS;
         private Properties brokerProps = DEFAULT_BROKER_CONFIG;
+        private boolean maskExitProcedures = false;
 
         public Builder name(String name) {
             this.name = name;
@@ -296,8 +446,26 @@ public class EmbeddedConnectCluster {
             return this;
         }
 
+        /**
+         * In the event of ungraceful shutdown, embedded clusters call exit or 
halt with non-zero
+         * exit statuses. Exiting with a non-zero status forces a test to fail 
and is hard to
+         * handle. Because graceful exit is usually not required during a test 
and because
+         * depending on such an exit increases flakiness, this setting allows 
masking
+         * exit and halt procedures by using a runtime exception instead. 
Customization of the
+         * exit and halt procedures is possible through {@code exitProcedure} 
and {@code
+         * haltProcedure} respectively.
+         *
+         * @param mask if false, exit and halt procedures remain unchanged; 
true is the default.
+         * @return the builder for this cluster
+         */
+        public Builder maskExitProcedures(boolean mask) {
+            this.maskExitProcedures = mask;
+            return this;
+        }
+
         public EmbeddedConnectCluster build() {
-            return new EmbeddedConnectCluster(name, workerProps, numWorkers, 
numBrokers, brokerProps);
+            return new EmbeddedConnectCluster(name, workerProps, numWorkers, 
numBrokers,
+                    brokerProps, maskExitProcedures);
         }
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/UngracefulShutdownException.java
similarity index 52%
copy from 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
copy to 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/UngracefulShutdownException.java
index a12751c..2f2b030 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/UngracefulShutdownException.java
@@ -14,34 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.runtime.rest.entities;
+package org.apache.kafka.connect.util.clusters;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.KafkaException;
 
-public class ServerInfo {
-    private final String version;
-    private final String commit;
-    private final String kafkaClusterId;
-
-    public ServerInfo(String kafkaClusterId) {
-        this.version = AppInfoParser.getVersion();
-        this.commit = AppInfoParser.getCommitId();
-        this.kafkaClusterId = kafkaClusterId;
-    }
-
-    @JsonProperty
-    public String version() {
-        return version;
+/**
+ * An exception that can be used from within an {@code Exit.Procedure} to mask 
exit or halt calls
+ * and signify that the service terminated abruptly. It's intended to be used 
only from within
+ * integration tests.
+ */
+public class UngracefulShutdownException extends KafkaException {
+    public UngracefulShutdownException(String s) {
+        super(s);
     }
 
-    @JsonProperty
-    public String commit() {
-        return commit;
+    public UngracefulShutdownException(String s, Throwable throwable) {
+        super(s, throwable);
     }
 
-    @JsonProperty("kafka_cluster_id")
-    public String clusterId() {
-        return kafkaClusterId;
+    public UngracefulShutdownException(Throwable throwable) {
+        super(throwable);
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
new file mode 100644
index 0000000..7113f52
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import org.apache.kafka.connect.cli.ConnectDistributed;
+import org.apache.kafka.connect.runtime.Connect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A handle to a worker executing in a Connect cluster.
+ */
+public class WorkerHandle {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerHandle.class);
+
+    private final String workerName;
+    private final Connect worker;
+
+    protected WorkerHandle(String workerName, Connect worker) {
+        this.workerName = workerName;
+        this.worker = worker;
+    }
+
+    /**
+     * Create and start a new worker with the given properties.
+     *
+     * @param name a name for this worker
+     * @param workerProperties the worker properties
+     * @return the worker's handle
+     */
+    public static WorkerHandle start(String name, Map<String, String> 
workerProperties) {
+        return new WorkerHandle(name, new 
ConnectDistributed().startConnect(workerProperties));
+    }
+
+    /**
+     * Stop this worker.
+     */
+    public void stop() {
+        worker.stop();
+    }
+
+    /**
+     * Get the workers's name corresponding to this handle.
+     *
+     * @return the worker's name
+     */
+    public String name() {
+        return workerName;
+    }
+
+    /**
+     * Get the workers's url that accepts requests to its REST endpoint.
+     *
+     * @return the worker's url
+     */
+    public URI url() {
+        return worker.restUrl();
+    }
+
+    @Override
+    public String toString() {
+        return "WorkerHandle{" +
+                "workerName='" + workerName + '\'' +
+                "workerURL='" + worker.restUrl() + '\'' +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof WorkerHandle)) {
+            return false;
+        }
+        WorkerHandle that = (WorkerHandle) o;
+        return Objects.equals(workerName, that.workerName) &&
+                Objects.equals(worker, that.worker);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(workerName, worker);
+    }
+}

Reply via email to