This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7e1fdca64d KAFKA-15988: Reuse embedded clusters across test cases in 
Connect OffsetsApiIntegrationTest suite (#14966)
c7e1fdca64d is described below

commit c7e1fdca64dcbd8cedf8aaf826c22566b7485dad
Author: Chris Egerton <chr...@aiven.io>
AuthorDate: Tue Jan 9 10:32:39 2024 -0500

    KAFKA-15988: Reuse embedded clusters across test cases in Connect 
OffsetsApiIntegrationTest suite (#14966)
    
    Reviewers: Sudesh Wasnik <swas...@confluent.io>, Sagar Rao 
<sagarmeansoc...@gmail.com>, Yash Mayya <yash.ma...@gmail.com>, Greg Harris 
<greg.har...@aiven.io>
---
 .../ConnectorRestartApiIntegrationTest.java        |   6 +-
 .../integration/OffsetsApiIntegrationTest.java     | 464 ++++++++++++---------
 2 files changed, 267 insertions(+), 203 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
index 26b4eb11417..a512eeaae0a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
@@ -73,7 +73,7 @@ public class ConnectorRestartApiIntegrationTest {
 
     private static final String TOPIC_NAME = "test-topic";
 
-    private static Map<Integer, EmbeddedConnectCluster> connectClusterMap = 
new ConcurrentHashMap<>();
+    private static final Map<Integer, EmbeddedConnectCluster> CONNECT_CLUSTERS 
= new ConcurrentHashMap<>();
 
     private EmbeddedConnectCluster connect;
     private ConnectorHandle connectorHandle;
@@ -91,7 +91,7 @@ public class ConnectorRestartApiIntegrationTest {
     }
 
     private void startOrReuseConnectWithNumWorkers(int numWorkers) throws 
Exception {
-        connect = connectClusterMap.computeIfAbsent(numWorkers, n -> {
+        connect = CONNECT_CLUSTERS.computeIfAbsent(numWorkers, n -> {
             // setup Connect worker properties
             Map<String, String> workerProps = new HashMap<>();
             workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
@@ -125,7 +125,7 @@ public class ConnectorRestartApiIntegrationTest {
     @AfterClass
     public static void close() {
         // stop all Connect, Kafka and Zk threads.
-        connectClusterMap.values().forEach(EmbeddedConnectCluster::stop);
+        CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
index ad2a5f168ff..cc92effb778 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
@@ -30,22 +30,31 @@ import org.apache.kafka.connect.util.SinkUtils;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.NoRetryException;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
@@ -56,6 +65,7 @@ import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -70,41 +80,85 @@ public class OffsetsApiIntegrationTest {
     private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
     private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
-    private static final String CONNECTOR_NAME = "test-connector";
-    private static final String TOPIC = "test-topic";
     private static final int NUM_TASKS = 2;
     private static final int NUM_RECORDS_PER_PARTITION = 10;
-    private Map<String, String> workerProps;
-    private EmbeddedConnectCluster.Builder connectBuilder;
+    private static final Map<Map<String, String>, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+    @Rule
+    public TestName currentTest = new TestName();
     private EmbeddedConnectCluster connect;
+    private String connectorName;
+    private String topic;
 
     @Before
     public void setup() {
-        Properties brokerProps = new Properties();
-        brokerProps.put("transaction.state.log.replication.factor", "1");
-        brokerProps.put("transaction.state.log.min.isr", "1");
-
-        // setup Connect worker properties
-        workerProps = new HashMap<>();
-        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-        // build a Connect cluster backed by Kafka and Zk
-        connectBuilder = new EmbeddedConnectCluster.Builder()
-                .name("connect-cluster")
-                .numWorkers(NUM_WORKERS)
-                .brokerProps(brokerProps)
-                .workerProps(workerProps);
+        connectorName = currentTest.getMethodName();
+        topic = currentTest.getMethodName();
+        connect = defaultConnectCluster();
     }
 
     @After
     public void tearDown() {
-        connect.stop();
+        Set<String> remainingConnectors = new HashSet<>(connect.connectors());
+        if (remainingConnectors.remove(connectorName)) {
+            connect.deleteConnector(connectorName);
+        }
+        try {
+            assertEquals(
+                    "Some connectors were not properly cleaned up after this 
test",
+                    Collections.emptySet(),
+                    remainingConnectors
+            );
+        } finally {
+            // Make a last-ditch effort to clean up the leaked connectors
+            // so as not to interfere with other test cases
+            remainingConnectors.forEach(connect::deleteConnector);
+        }
+    }
+
+    @AfterClass
+    public static void close() {
+        // stop all Connect, Kafka and Zk threads.
+        CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
+    }
+
+    private static EmbeddedConnectCluster 
createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) {
+        return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> {
+            Properties brokerProps = new Properties();
+            brokerProps.put("transaction.state.log.replication.factor", "1");
+            brokerProps.put("transaction.state.log.min.isr", "1");
+
+            // Have to declare a new map since the passed-in one may be 
immutable
+            Map<String, String> workerPropsWithDefaults = new 
HashMap<>(workerProps);
+            // Enable fast offset commits by default
+            
workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+
+            EmbeddedConnectCluster result = new 
EmbeddedConnectCluster.Builder()
+                    .name("connect-cluster")
+                    .numWorkers(NUM_WORKERS)
+                    .brokerProps(brokerProps)
+                    .workerProps(workerPropsWithDefaults)
+                    .build();
+
+            result.start();
+
+            return result;
+        });
+    }
+
+    private static EmbeddedConnectCluster defaultConnectCluster() {
+        return createOrReuseConnectWithWorkerProps(Collections.emptyMap());
+    }
+
+    private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() {
+        Map<String, String> workerProps = Collections.singletonMap(
+                DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+                "enabled"
+        );
+        return createOrReuseConnectWithWorkerProps(workerProps);
     }
 
     @Test
     public void testGetNonExistentConnectorOffsets() {
-        connect = connectBuilder.build();
-        connect.start();
         ConnectRestException e = assertThrows(ConnectRestException.class,
                 () -> connect.connectorOffsets("non-existent-connector"));
         assertEquals(404, e.errorCode());
@@ -112,32 +166,29 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testGetSinkConnectorOffsets() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
     }
 
     @Test
     public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
         Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
-        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
-                "overridden-group-id");
+        String overriddenGroupId = connectorName + "-overridden-group-id";
+        connectorConfigs.put(
+                ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.GROUP_ID_CONFIG,
+                overriddenGroupId
+        );
         getAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
 
         // Ensure that the overridden consumer group ID was the one actually 
used
         try (Admin admin = connect.kafka().createAdminClient()) {
             Collection<ConsumerGroupListing> consumerGroups = 
admin.listConsumerGroups().all().get();
-            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
-            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> overriddenGroupId.equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
         }
     }
 
     @Test
     public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
 
         try (AutoCloseable ignored = kafkaCluster::stop) {
@@ -154,54 +205,49 @@ public class OffsetsApiIntegrationTest {
     }
 
     private void getAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
-        kafkaCluster.createTopic(TOPIC, 5);
+        kafkaCluster.createTopic(topic, 5);
 
         // Produce records to each partition
         for (int partition = 0; partition < 5; partition++) {
             for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) 
{
-                kafkaCluster.produce(TOPIC, partition, "key", "value");
+                kafkaCluster.produce(topic, partition, "key", "value");
             }
         }
 
         // Create sink connector
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 5, 
NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
 
         // Produce more records to each partition
         for (int partition = 0; partition < 5; partition++) {
             for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) 
{
-                kafkaCluster.produce(TOPIC, partition, "key", "value");
+                kafkaCluster.produce(topic, partition, "key", "value");
             }
         }
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, 2 * 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 5, 2 * 
NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
     }
 
     @Test
     public void testGetSourceConnectorOffsets() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
     }
 
     @Test
     public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
         Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
-        connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"custom-offsets-topic");
+        String connectorOffsetsTopic = connectorName + "-custom-offsets-topic";
+        connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
connectorOffsetsTopic);
         getAndVerifySourceConnectorOffsets(connectorConfigs);
     }
 
     @Test
     public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
 
         try (AutoCloseable ignored = kafkaCluster::stop) {
@@ -219,25 +265,23 @@ public class OffsetsApiIntegrationTest {
 
     private void getAndVerifySourceConnectorOffsets(Map<String, String> 
connectorConfigs) throws Exception {
         // Create source connector
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
                 "Source connector offsets should reflect the expected number 
of records produced");
 
         // Each task should produce more records
         
connectorConfigs.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, 
String.valueOf(2 * NUM_RECORDS_PER_PARTITION));
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        connect.configureConnector(connectorName, connectorConfigs);
 
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 2 * 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 2 * 
NUM_RECORDS_PER_PARTITION,
                 "Source connector offsets should reflect the expected number 
of records produced");
     }
 
     @Test
     public void testAlterOffsetsNonExistentConnector() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         ConnectRestException e = assertThrows(ConnectRestException.class,
                 () -> connect.alterConnectorOffsets("non-existent-connector", 
new ConnectorOffsets(Collections.singletonList(
                         new ConnectorOffset(Collections.emptyMap(), 
Collections.emptyMap())))));
@@ -246,67 +290,62 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testAlterOffsetsNonStoppedConnector() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         // Create source connector
-        connect.configureConnector(CONNECTOR_NAME, 
baseSourceConnectorConfigs());
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, 
baseSourceConnectorConfigs());
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
         List<ConnectorOffset> offsets = new ArrayList<>();
         // The MonitorableSourceConnector has a source partition per task
         for (int i = 0; i < NUM_TASKS; i++) {
             offsets.add(
-                    new ConnectorOffset(Collections.singletonMap("task.id", 
CONNECTOR_NAME + "-" + i),
+                    new ConnectorOffset(Collections.singletonMap("task.id", 
connectorName + "-" + i),
                             Collections.singletonMap("saved", 5))
             );
         }
 
         // Try altering offsets for a running connector
         ConnectRestException e = assertThrows(ConnectRestException.class,
-                () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsets)));
+                () -> connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsets)));
         assertEquals(400, e.errorCode());
 
-        connect.pauseConnector(CONNECTOR_NAME);
+        connect.pauseConnector(connectorName);
         connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
-                CONNECTOR_NAME,
+                connectorName,
                 NUM_TASKS,
                 "Connector did not pause in time"
         );
 
         // Try altering offsets for a paused (not stopped) connector
         e = assertThrows(ConnectRestException.class,
-                () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsets)));
+                () -> connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsets)));
         assertEquals(400, e.errorCode());
     }
 
     @Test
     public void testAlterSinkConnectorOffsets() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
     }
 
     @Test
     public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
-        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
-                "overridden-group-id");
+        String overriddenGroupId = connectorName + "-overridden-group-id";
+        connectorConfigs.put(
+                ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.GROUP_ID_CONFIG,
+                overriddenGroupId
+        );
         alterAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
         // Ensure that the overridden consumer group ID was the one actually 
used
         try (Admin admin = connect.kafka().createAdminClient()) {
             Collection<ConsumerGroupListing> consumerGroups = 
admin.listConsumerGroups().all().get();
-            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
-            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> overriddenGroupId.equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
         }
     }
 
     @Test
     public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
 
         try (AutoCloseable ignored = kafkaCluster::stop) {
@@ -324,127 +363,125 @@ public class OffsetsApiIntegrationTest {
 
     private void alterAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
         int numPartitions = 3;
-        kafkaCluster.createTopic(TOPIC, numPartitions);
+        kafkaCluster.createTopic(topic, numPartitions);
 
         // Produce records to each partition
         for (int partition = 0; partition < numPartitions; partition++) {
             for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) 
{
-                kafkaCluster.produce(TOPIC, partition, "key", "value");
+                kafkaCluster.produce(topic, partition, "key", "value");
             }
         }
         // Create sink connector
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 
numPartitions, NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
 
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
         connect.assertions().assertConnectorIsStopped(
-                CONNECTOR_NAME,
+                connectorName,
                 "Connector did not stop in time"
         );
 
         // Delete the offset of one partition; alter the offsets of the others
         List<ConnectorOffset> offsetsToAlter = new ArrayList<>();
         Map<String, Object> partition = new HashMap<>();
-        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
         partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
         offsetsToAlter.add(new ConnectorOffset(partition, null));
 
         for (int i = 1; i < numPartitions; i++) {
             partition = new HashMap<>();
-            partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+            partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
             partition.put(SinkUtils.KAFKA_PARTITION_KEY, i);
             offsetsToAlter.add(new ConnectorOffset(partition, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 5)));
         }
 
-        String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+        // Alter the sink connector's offsets, with retry logic (since we just 
stopped the connector)
+        String response = modifySinkConnectorOffsetsWithRetry(new 
ConnectorOffsets(offsetsToAlter));
+
         assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been altered successfully. " +
                 "However, if this connector manages offsets externally, they 
will need to be manually altered in the system that the connector uses."));
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions - 1, 5,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions 
- 1, 5,
                 "Sink connector consumer group offsets should reflect the 
altered offsets");
 
         // Update the connector's configs; this time expect 
SinkConnector::alterOffsets to return true
         connectorConfigs.put(MonitorableSinkConnector.ALTER_OFFSETS_RESULT, 
"true");
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        connect.configureConnector(connectorName, connectorConfigs);
 
         // Alter offsets again while the connector is still in a stopped state
         offsetsToAlter.clear();
         for (int i = 1; i < numPartitions; i++) {
             partition = new HashMap<>();
-            partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+            partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
             partition.put(SinkUtils.KAFKA_PARTITION_KEY, i);
             offsetsToAlter.add(new ConnectorOffset(partition, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 3)));
         }
 
-        response = connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+        response = connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsetsToAlter));
         assertThat(response, containsString("The offsets for this connector 
have been altered successfully"));
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions - 1, 3,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions 
- 1, 3,
                 "Sink connector consumer group offsets should reflect the 
altered offsets");
 
         // Resume the connector and expect its offsets to catch up to the 
latest offsets
-        connect.resumeConnector(CONNECTOR_NAME);
+        connect.resumeConnector(connectorName);
         connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
-                CONNECTOR_NAME,
+                connectorName,
                 NUM_TASKS,
                 "Connector tasks did not resume in time"
         );
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 
numPartitions, NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
     }
 
     @Test
     public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
-        connect.kafka().createTopic(TOPIC, 1);
+        connect.kafka().createTopic(topic, 1);
 
         // Produce records
         for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
-            connect.kafka().produce(TOPIC, 0, "key", "value");
+            connect.kafka().produce(topic, 0, "key", "value");
         }
 
         // Configure a sink connector whose sink task blocks in its stop method
         Map<String, String> connectorConfigs = new HashMap<>();
         connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
-        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
+        connectorConfigs.put(TOPICS_CONFIG, topic);
         connectorConfigs.put("block", "Task::stop");
 
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
1,
                 "Connector tasks did not start in time.");
 
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
 
         // Try to delete the offsets for the single topic partition
         Map<String, Object> partition = new HashMap<>();
-        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
         partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
         List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, null));
 
         ConnectRestException e = assertThrows(ConnectRestException.class,
-                () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter)));
+                () -> connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsetsToAlter)));
         assertThat(e.getMessage(), containsString("zombie sink task"));
     }
 
     @Test
     public void testAlterSinkConnectorOffsetsInvalidRequestBody() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
         // Create a sink connector and stop it
-        connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorConfigs());
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, baseSinkConnectorConfigs());
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
         connect.assertions().assertConnectorIsStopped(
-                CONNECTOR_NAME,
+                connectorName,
                 "Connector did not stop in time"
         );
-        String url = 
connect.endpointForResource(String.format("connectors/%s/offsets", 
CONNECTOR_NAME));
+        String url = 
connect.endpointForResource(String.format("connectors/%s/offsets", 
connectorName));
 
         String content = "{}";
         try (Response response = connect.requestPatch(url, content)) {
@@ -497,15 +534,11 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testAlterSourceConnectorOffsets() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
     }
 
     @Test
     public void testAlterSourceConnectorOffsetsCustomOffsetsTopic() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
         Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
         connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"custom-offsets-topic");
         alterAndVerifySourceConnectorOffsets(connectorConfigs);
@@ -513,8 +546,6 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testAlterSourceConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
 
         try (AutoCloseable ignored = kafkaCluster::stop) {
@@ -532,25 +563,23 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled() 
throws Exception {
-        workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, 
"enabled");
-        connect = connectBuilder.workerProps(workerProps).build();
-        connect.start();
+        connect = exactlyOnceSourceConnectCluster();
 
         alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
     }
 
     public void alterAndVerifySourceConnectorOffsets(Map<String, String> 
connectorConfigs) throws Exception {
         // Create source connector
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
                 "Source connector offsets should reflect the expected number 
of records produced");
 
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
         connect.assertions().assertConnectorIsStopped(
-                CONNECTOR_NAME,
+                connectorName,
                 "Connector did not stop in time"
         );
 
@@ -558,63 +587,61 @@ public class OffsetsApiIntegrationTest {
         // The MonitorableSourceConnector has a source partition per task
         for (int i = 0; i < NUM_TASKS; i++) {
             offsetsToAlter.add(
-                    new ConnectorOffset(Collections.singletonMap("task.id", 
CONNECTOR_NAME + "-" + i),
+                    new ConnectorOffset(Collections.singletonMap("task.id", 
connectorName + "-" + i),
                             Collections.singletonMap("saved", 5))
             );
         }
 
-        String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+        String response = connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsetsToAlter));
         assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been altered successfully. " +
                 "However, if this connector manages offsets externally, they 
will need to be manually altered in the system that the connector uses."));
 
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 5,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 5,
                 "Source connector offsets should reflect the altered offsets");
 
         // Update the connector's configs; this time expect 
SourceConnector::alterOffsets to return true
         connectorConfigs.put(MonitorableSourceConnector.ALTER_OFFSETS_RESULT, 
"true");
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        connect.configureConnector(connectorName, connectorConfigs);
 
         // Alter offsets again while connector is in stopped state
         offsetsToAlter = new ArrayList<>();
         // The MonitorableSourceConnector has a source partition per task
         for (int i = 0; i < NUM_TASKS; i++) {
             offsetsToAlter.add(
-                    new ConnectorOffset(Collections.singletonMap("task.id", 
CONNECTOR_NAME + "-" + i),
+                    new ConnectorOffset(Collections.singletonMap("task.id", 
connectorName + "-" + i),
                             Collections.singletonMap("saved", 7))
             );
         }
 
-        response = connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+        response = connect.alterConnectorOffsets(connectorName, new 
ConnectorOffsets(offsetsToAlter));
         assertThat(response, containsString("The offsets for this connector 
have been altered successfully"));
 
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 7,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 7,
                 "Source connector offsets should reflect the altered offsets");
 
         // Resume the connector and expect its offsets to catch up to the 
latest offsets
-        connect.resumeConnector(CONNECTOR_NAME);
+        connect.resumeConnector(connectorName);
         connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
-                CONNECTOR_NAME,
+                connectorName,
                 NUM_TASKS,
                 "Connector tasks did not resume in time"
         );
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
                 "Source connector offsets should reflect the expected number 
of records produced");
     }
 
     @Test
     public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
         // Create a source connector and stop it
-        connect.configureConnector(CONNECTOR_NAME, 
baseSourceConnectorConfigs());
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, 
baseSourceConnectorConfigs());
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
         connect.assertions().assertConnectorIsStopped(
-                CONNECTOR_NAME,
+                connectorName,
                 "Connector did not stop in time"
         );
-        String url = 
connect.endpointForResource(String.format("connectors/%s/offsets", 
CONNECTOR_NAME));
+        String url = 
connect.endpointForResource(String.format("connectors/%s/offsets", 
connectorName));
 
         String content = "[]";
         try (Response response = connect.requestPatch(url, content)) {
@@ -667,31 +694,28 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testResetSinkConnectorOffsets() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
     }
 
     @Test
     public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
-        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
-                "overridden-group-id");
+        String overriddenGroupId = connectorName + "-overridden-group-id";
+        connectorConfigs.put(
+                ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.GROUP_ID_CONFIG,
+                overriddenGroupId
+        );
         resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
         // Ensure that the overridden consumer group ID was the one actually 
used
         try (Admin admin = connect.kafka().createAdminClient()) {
             Collection<ConsumerGroupListing> consumerGroups = 
admin.listConsumerGroups().all().get();
-            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
-            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> overriddenGroupId.equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
         }
     }
 
     @Test
     public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
 
         try (AutoCloseable ignored = kafkaCluster::stop) {
@@ -709,95 +733,89 @@ public class OffsetsApiIntegrationTest {
 
     private void resetAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
         int numPartitions = 3;
-        kafkaCluster.createTopic(TOPIC, numPartitions);
+        kafkaCluster.createTopic(topic, numPartitions);
 
         // Produce records to each partition
         for (int partition = 0; partition < numPartitions; partition++) {
             for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) 
{
-                kafkaCluster.produce(TOPIC, partition, "key", "value");
+                kafkaCluster.produce(topic, partition, "key", "value");
             }
         }
         // Create sink connector
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 
numPartitions, NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
 
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
         connect.assertions().assertConnectorIsStopped(
-                CONNECTOR_NAME,
+                connectorName,
                 "Connector did not stop in time"
         );
 
-        // Reset the sink connector's offsets
-        String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        // Reset the sink connector's offsets, with retry logic (since we just 
stopped the connector)
+        String response = modifySinkConnectorOffsetsWithRetry(null);
         assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
                 "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
 
-        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
+        verifyEmptyConnectorOffsets(connectorName);
 
         // Reset the sink connector's offsets again while it is still in a 
STOPPED state and ensure that there is no error
-        response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        response = connect.resetConnectorOffsets(connectorName);
         assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
                 "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
 
-        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
+        verifyEmptyConnectorOffsets(connectorName);
 
         // Resume the connector and expect its offsets to catch up to the 
latest offsets
-        connect.resumeConnector(CONNECTOR_NAME);
+        connect.resumeConnector(connectorName);
         connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
-                CONNECTOR_NAME,
+                connectorName,
                 NUM_TASKS,
                 "Connector tasks did not resume in time"
         );
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 
numPartitions, NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
     }
 
     @Test
     public void testResetSinkConnectorOffsetsZombieSinkTasks() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
-        connect.kafka().createTopic(TOPIC, 1);
+        connect.kafka().createTopic(topic, 1);
 
         // Produce records
         for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
-            connect.kafka().produce(TOPIC, 0, "key", "value");
+            connect.kafka().produce(topic, 0, "key", "value");
         }
 
         // Configure a sink connector whose sink task blocks in its stop method
         Map<String, String> connectorConfigs = new HashMap<>();
         connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
-        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
+        connectorConfigs.put(TOPICS_CONFIG, topic);
         connectorConfigs.put("block", "Task::stop");
 
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
1,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, 
NUM_RECORDS_PER_PARTITION,
                 "Sink connector consumer group offsets should catch up to the 
topic end offsets");
 
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
 
         // Try to reset the offsets
-        ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> connect.resetConnectorOffsets(CONNECTOR_NAME));
+        ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> connect.resetConnectorOffsets(connectorName));
         assertThat(e.getMessage(), containsString("zombie sink task"));
     }
 
     @Test
     public void testResetSourceConnectorOffsets() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
     }
 
     @Test
     public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
         Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
         connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"custom-offsets-topic");
         resetAndVerifySourceConnectorOffsets(connectorConfigs);
@@ -805,8 +823,6 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
 
         try (AutoCloseable ignored = kafkaCluster::stop) {
@@ -824,50 +840,48 @@ public class OffsetsApiIntegrationTest {
 
     @Test
     public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() 
throws Exception {
-        workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, 
"enabled");
-        connect = connectBuilder.workerProps(workerProps).build();
-        connect.start();
+        connect = exactlyOnceSourceConnectCluster();
 
         resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
     }
 
     public void resetAndVerifySourceConnectorOffsets(Map<String, String> 
connectorConfigs) throws Exception {
         // Create source connector
-        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
-        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+        connect.configureConnector(connectorName, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 
NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
                 "Source connector offsets should reflect the expected number 
of records produced");
 
-        connect.stopConnector(CONNECTOR_NAME);
+        connect.stopConnector(connectorName);
         connect.assertions().assertConnectorIsStopped(
-                CONNECTOR_NAME,
+                connectorName,
                 "Connector did not stop in time"
         );
 
         // Reset the source connector's offsets
-        String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        String response = connect.resetConnectorOffsets(connectorName);
         assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
                 "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
 
-        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
+        verifyEmptyConnectorOffsets(connectorName);
 
         // Reset the source connector's offsets again while it is still in a 
STOPPED state and ensure that there is no error
-        response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        response = connect.resetConnectorOffsets(connectorName);
         assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
                 "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
 
-        verifyEmptyConnectorOffsets(CONNECTOR_NAME);
+        verifyEmptyConnectorOffsets(connectorName);
 
         // Resume the connector and expect its offsets to catch up to the 
latest offsets
-        connect.resumeConnector(CONNECTOR_NAME);
+        connect.resumeConnector(connectorName);
         connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
-                CONNECTOR_NAME,
+                connectorName,
                 NUM_TASKS,
                 "Connector tasks did not resume in time"
         );
-        verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
+        verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 
NUM_RECORDS_PER_PARTITION,
                 "Source connector offsets should reflect the expected number 
of records produced");
     }
 
@@ -875,7 +889,7 @@ public class OffsetsApiIntegrationTest {
         Map<String, String> configs = new HashMap<>();
         configs.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSinkConnector.class.getSimpleName());
         configs.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
-        configs.put(TOPICS_CONFIG, TOPIC);
+        configs.put(TOPICS_CONFIG, topic);
         configs.put(KEY_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
         configs.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
         return configs;
@@ -885,7 +899,7 @@ public class OffsetsApiIntegrationTest {
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
-        props.put(TOPIC_CONFIG, TOPIC);
+        props.put(TOPIC_CONFIG, topic);
         props.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3");
         props.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, 
String.valueOf(NUM_RECORDS_PER_PARTITION));
         props.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
@@ -895,6 +909,56 @@ public class OffsetsApiIntegrationTest {
         return props;
     }
 
+    /**
+     * Modify (i.e., alter or reset) the offsets for a sink connector, with 
retry logic to
+     * handle cases where laggy task shutdown may have left a consumer in the 
group.
+     * @param offsetsToAlter the offsets to alter for the sink connector, or 
null if
+     *                       the connector's offsets should be reset instead
+     * @return the response from the REST API, if the request was successful
+     * @throws InterruptedException if the thread is interrupted while waiting 
for a
+     * request to modify the connector's offsets to succeed
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-15826";>KAFKA-15826</a>
+     */
+    private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets 
offsetsToAlter) throws InterruptedException {
+        // Some retry logic is necessary to account for KAFKA-15826,
+        // where laggy sink task startup/shutdown can leave consumers running
+        String modifyVerb = offsetsToAlter != null ?  "alter" : "reset";
+        String conditionDetails = "Failed to " + modifyVerb + " sink connector 
offsets in time";
+        AtomicReference<String> responseReference = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    try {
+                        if (offsetsToAlter == null) {
+                            
responseReference.set(connect.resetConnectorOffsets(connectorName));
+                        } else {
+                            
responseReference.set(connect.alterConnectorOffsets(connectorName, 
offsetsToAlter));
+                        }
+                        return true;
+                    } catch (ConnectRestException e) {
+                        boolean internalServerError = e.statusCode() == 
INTERNAL_SERVER_ERROR.getStatusCode();
+
+                        String message = 
Optional.of(e.getMessage()).orElse("");
+                        boolean failedToModifyConsumerOffsets = 
message.contains(
+                                "Failed to " + modifyVerb + " consumer group 
offsets for connector"
+                        );
+                        boolean canBeRetried = message.contains("If the 
connector is in a stopped state, this operation can be safely retried");
+
+                        boolean retriable = internalServerError && 
failedToModifyConsumerOffsets && canBeRetried;
+                        if (retriable) {
+                            return false;
+                        } else {
+                            throw new NoRetryException(e);
+                        }
+                    } catch (Throwable t) {
+                        throw new NoRetryException(t);
+                    }
+                },
+                30_000,
+                conditionDetails
+        );
+        return responseReference.get();
+    }
+
     /**
      * Verify whether the actual consumer group offsets for a sink connector 
match the expected offsets. The verification
      * is done using the <strong><em>GET 
/connectors/{connector}/offsets</em></strong> REST API which is repeatedly 
queried
@@ -914,7 +978,7 @@ public class OffsetsApiIntegrationTest {
      */
     private void verifyExpectedSinkConnectorOffsets(String connectorName, 
String expectedTopic, int expectedPartitions,
                                                     int expectedOffset, String 
conditionDetails) throws InterruptedException {
-        TestUtils.waitForCondition(() -> {
+        waitForCondition(() -> {
             ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
             if (offsets.offsets().size() != expectedPartitions) {
                 return false;
@@ -944,14 +1008,14 @@ public class OffsetsApiIntegrationTest {
      */
     private void verifyExpectedSourceConnectorOffsets(String connectorName, 
int numTasks,
                                                       int expectedOffset, 
String conditionDetails) throws InterruptedException {
-        TestUtils.waitForCondition(() -> {
+        waitForCondition(() -> {
             ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
             // The MonitorableSourceConnector has a source partition per task
             if (offsets.offsets().size() != numTasks) {
                 return false;
             }
             for (ConnectorOffset offset : offsets.offsets()) {
-                assertTrue(((String) 
offset.partition().get("task.id")).startsWith(CONNECTOR_NAME));
+                assertTrue(((String) 
offset.partition().get("task.id")).startsWith(connectorName));
                 if ((Integer) offset.offset().get("saved") != expectedOffset) {
                     return false;
                 }
@@ -969,7 +1033,7 @@ public class OffsetsApiIntegrationTest {
      * @throws InterruptedException if the thread is interrupted while waiting 
for the offsets to be empty
      */
     private void verifyEmptyConnectorOffsets(String connectorName) throws 
InterruptedException {
-        TestUtils.waitForCondition(() -> {
+        waitForCondition(() -> {
             ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
             return offsets.offsets().isEmpty();
         }, OFFSET_READ_TIMEOUT_MS, "Connector offsets should be empty after 
resetting offsets");

Reply via email to