This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c7e1fdca64d KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite (#14966) c7e1fdca64d is described below commit c7e1fdca64dcbd8cedf8aaf826c22566b7485dad Author: Chris Egerton <chr...@aiven.io> AuthorDate: Tue Jan 9 10:32:39 2024 -0500 KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite (#14966) Reviewers: Sudesh Wasnik <swas...@confluent.io>, Sagar Rao <sagarmeansoc...@gmail.com>, Yash Mayya <yash.ma...@gmail.com>, Greg Harris <greg.har...@aiven.io> --- .../ConnectorRestartApiIntegrationTest.java | 6 +- .../integration/OffsetsApiIntegrationTest.java | 464 ++++++++++++--------- 2 files changed, 267 insertions(+), 203 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java index 26b4eb11417..a512eeaae0a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java @@ -73,7 +73,7 @@ public class ConnectorRestartApiIntegrationTest { private static final String TOPIC_NAME = "test-topic"; - private static Map<Integer, EmbeddedConnectCluster> connectClusterMap = new ConcurrentHashMap<>(); + private static final Map<Integer, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; @@ -91,7 +91,7 @@ public class ConnectorRestartApiIntegrationTest { } private void startOrReuseConnectWithNumWorkers(int numWorkers) throws Exception { - connect = connectClusterMap.computeIfAbsent(numWorkers, n -> { + connect = CONNECT_CLUSTERS.computeIfAbsent(numWorkers, n -> { // setup Connect worker properties Map<String, String> workerProps = new HashMap<>(); workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); @@ -125,7 +125,7 @@ public class ConnectorRestartApiIntegrationTest { @AfterClass public static void close() { // stop all Connect, Kafka and Zk threads. - connectClusterMap.values().forEach(EmbeddedConnectCluster::stop); + CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index ad2a5f168ff..cc92effb778 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -30,22 +30,31 @@ import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; +import org.apache.kafka.test.NoRetryException; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; @@ -56,6 +65,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F import static org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -70,41 +80,85 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; - private static final String CONNECTOR_NAME = "test-connector"; - private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; - private Map<String, String> workerProps; - private EmbeddedConnectCluster.Builder connectBuilder; + private static final Map<Map<String, String>, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); + @Rule + public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; + private String connectorName; + private String topic; @Before public void setup() { - Properties brokerProps = new Properties(); - brokerProps.put("transaction.state.log.replication.factor", "1"); - brokerProps.put("transaction.state.log.min.isr", "1"); - - // setup Connect worker properties - workerProps = new HashMap<>(); - workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - - // build a Connect cluster backed by Kafka and Zk - connectBuilder = new EmbeddedConnectCluster.Builder() - .name("connect-cluster") - .numWorkers(NUM_WORKERS) - .brokerProps(brokerProps) - .workerProps(workerProps); + connectorName = currentTest.getMethodName(); + topic = currentTest.getMethodName(); + connect = defaultConnectCluster(); } @After public void tearDown() { - connect.stop(); + Set<String> remainingConnectors = new HashSet<>(connect.connectors()); + if (remainingConnectors.remove(connectorName)) { + connect.deleteConnector(connectorName); + } + try { + assertEquals( + "Some connectors were not properly cleaned up after this test", + Collections.emptySet(), + remainingConnectors + ); + } finally { + // Make a last-ditch effort to clean up the leaked connectors + // so as not to interfere with other test cases + remainingConnectors.forEach(connect::deleteConnector); + } + } + + @AfterClass + public static void close() { + // stop all Connect, Kafka and Zk threads. + CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); + } + + private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) { + return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + + // Have to declare a new map since the passed-in one may be immutable + Map<String, String> workerPropsWithDefaults = new HashMap<>(workerProps); + // Enable fast offset commits by default + workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); + + EmbeddedConnectCluster result = new EmbeddedConnectCluster.Builder() + .name("connect-cluster") + .numWorkers(NUM_WORKERS) + .brokerProps(brokerProps) + .workerProps(workerPropsWithDefaults) + .build(); + + result.start(); + + return result; + }); + } + + private static EmbeddedConnectCluster defaultConnectCluster() { + return createOrReuseConnectWithWorkerProps(Collections.emptyMap()); + } + + private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() { + Map<String, String> workerProps = Collections.singletonMap( + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, + "enabled" + ); + return createOrReuseConnectWithWorkerProps(workerProps); } @Test public void testGetNonExistentConnectorOffsets() { - connect = connectBuilder.build(); - connect.start(); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.connectorOffsets("non-existent-connector")); assertEquals(404, e.errorCode()); @@ -112,32 +166,29 @@ public class OffsetsApiIntegrationTest { @Test public void testGetSinkConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); } @Test public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); - connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, - "overridden-group-id"); + String overriddenGroupId = connectorName + "-overridden-group-id"; + connectorConfigs.put( + ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + overriddenGroupId + ); getAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); // Ensure that the overridden consumer group ID was the one actually used try (Admin admin = connect.kafka().createAdminClient()) { Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get(); - assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); - assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId()))); } } @Test public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { - connect = connectBuilder.build(); - connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -154,54 +205,49 @@ public class OffsetsApiIntegrationTest { } private void getAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { - kafkaCluster.createTopic(TOPIC, 5); + kafkaCluster.createTopic(topic, 5); // Produce records to each partition for (int partition = 0; partition < 5; partition++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { - kafkaCluster.produce(TOPIC, partition, "key", "value"); + kafkaCluster.produce(topic, partition, "key", "value"); } } // Create sink connector - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, 5, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); // Produce more records to each partition for (int partition = 0; partition < 5; partition++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { - kafkaCluster.produce(TOPIC, partition, "key", "value"); + kafkaCluster.produce(topic, partition, "key", "value"); } } - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, 2 * NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, 5, 2 * NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); } @Test public void testGetSourceConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } @Test public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); - connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); + String connectorOffsetsTopic = connectorName + "-custom-offsets-topic"; + connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, connectorOffsetsTopic); getAndVerifySourceConnectorOffsets(connectorConfigs); } @Test public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { - connect = connectBuilder.build(); - connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -219,25 +265,23 @@ public class OffsetsApiIntegrationTest { private void getAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception { // Create source connector - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); // Each task should produce more records connectorConfigs.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(2 * NUM_RECORDS_PER_PARTITION)); - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.configureConnector(connectorName, connectorConfigs); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 2 * NUM_RECORDS_PER_PARTITION, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 2 * NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); } @Test public void testAlterOffsetsNonExistentConnector() throws Exception { - connect = connectBuilder.build(); - connect.start(); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.alterConnectorOffsets("non-existent-connector", new ConnectorOffsets(Collections.singletonList( new ConnectorOffset(Collections.emptyMap(), Collections.emptyMap()))))); @@ -246,67 +290,62 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterOffsetsNonStoppedConnector() throws Exception { - connect = connectBuilder.build(); - connect.start(); // Create source connector - connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs()); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, baseSourceConnectorConfigs()); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); List<ConnectorOffset> offsets = new ArrayList<>(); // The MonitorableSourceConnector has a source partition per task for (int i = 0; i < NUM_TASKS; i++) { offsets.add( - new ConnectorOffset(Collections.singletonMap("task.id", CONNECTOR_NAME + "-" + i), + new ConnectorOffset(Collections.singletonMap("task.id", connectorName + "-" + i), Collections.singletonMap("saved", 5)) ); } // Try altering offsets for a running connector ConnectRestException e = assertThrows(ConnectRestException.class, - () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsets))); + () -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsets))); assertEquals(400, e.errorCode()); - connect.pauseConnector(CONNECTOR_NAME); + connect.pauseConnector(connectorName); connect.assertions().assertConnectorAndExactlyNumTasksArePaused( - CONNECTOR_NAME, + connectorName, NUM_TASKS, "Connector did not pause in time" ); // Try altering offsets for a paused (not stopped) connector e = assertThrows(ConnectRestException.class, - () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsets))); + () -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsets))); assertEquals(400, e.errorCode()); } @Test public void testAlterSinkConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); } @Test public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); - connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, - "overridden-group-id"); + String overriddenGroupId = connectorName + "-overridden-group-id"; + connectorConfigs.put( + ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + overriddenGroupId + ); alterAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); // Ensure that the overridden consumer group ID was the one actually used try (Admin admin = connect.kafka().createAdminClient()) { Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get(); - assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); - assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId()))); } } @Test public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { - connect = connectBuilder.build(); - connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -324,127 +363,125 @@ public class OffsetsApiIntegrationTest { private void alterAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { int numPartitions = 3; - kafkaCluster.createTopic(TOPIC, numPartitions); + kafkaCluster.createTopic(topic, numPartitions); // Produce records to each partition for (int partition = 0; partition < numPartitions; partition++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { - kafkaCluster.produce(TOPIC, partition, "key", "value"); + kafkaCluster.produce(topic, partition, "key", "value"); } } // Create sink connector - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); connect.assertions().assertConnectorIsStopped( - CONNECTOR_NAME, + connectorName, "Connector did not stop in time" ); // Delete the offset of one partition; alter the offsets of the others List<ConnectorOffset> offsetsToAlter = new ArrayList<>(); Map<String, Object> partition = new HashMap<>(); - partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic); partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); offsetsToAlter.add(new ConnectorOffset(partition, null)); for (int i = 1; i < numPartitions; i++) { partition = new HashMap<>(); - partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic); partition.put(SinkUtils.KAFKA_PARTITION_KEY, i); offsetsToAlter.add(new ConnectorOffset(partition, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 5))); } - String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + // Alter the sink connector's offsets, with retry logic (since we just stopped the connector) + String response = modifySinkConnectorOffsetsWithRetry(new ConnectorOffsets(offsetsToAlter)); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " + "However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses.")); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 5, + verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions - 1, 5, "Sink connector consumer group offsets should reflect the altered offsets"); // Update the connector's configs; this time expect SinkConnector::alterOffsets to return true connectorConfigs.put(MonitorableSinkConnector.ALTER_OFFSETS_RESULT, "true"); - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.configureConnector(connectorName, connectorConfigs); // Alter offsets again while the connector is still in a stopped state offsetsToAlter.clear(); for (int i = 1; i < numPartitions; i++) { partition = new HashMap<>(); - partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic); partition.put(SinkUtils.KAFKA_PARTITION_KEY, i); offsetsToAlter.add(new ConnectorOffset(partition, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 3))); } - response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + response = connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter)); assertThat(response, containsString("The offsets for this connector have been altered successfully")); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 3, + verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions - 1, 3, "Sink connector consumer group offsets should reflect the altered offsets"); // Resume the connector and expect its offsets to catch up to the latest offsets - connect.resumeConnector(CONNECTOR_NAME); + connect.resumeConnector(connectorName); connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( - CONNECTOR_NAME, + connectorName, NUM_TASKS, "Connector tasks did not resume in time" ); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); } @Test public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { - connect = connectBuilder.build(); - connect.start(); - connect.kafka().createTopic(TOPIC, 1); + connect.kafka().createTopic(topic, 1); // Produce records for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { - connect.kafka().produce(TOPIC, 0, "key", "value"); + connect.kafka().produce(topic, 0, "key", "value"); } // Configure a sink connector whose sink task blocks in its stop method Map<String, String> connectorConfigs = new HashMap<>(); connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); - connectorConfigs.put(TOPICS_CONFIG, TOPIC); + connectorConfigs.put(TOPICS_CONFIG, topic); connectorConfigs.put("block", "Task::stop"); - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1, "Connector tasks did not start in time."); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); // Try to delete the offsets for the single topic partition Map<String, Object> partition = new HashMap<>(); - partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic); partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, null)); ConnectRestException e = assertThrows(ConnectRestException.class, - () -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter))); + () -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter))); assertThat(e.getMessage(), containsString("zombie sink task")); } @Test public void testAlterSinkConnectorOffsetsInvalidRequestBody() throws Exception { - connect = connectBuilder.build(); - connect.start(); // Create a sink connector and stop it - connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorConfigs()); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, baseSinkConnectorConfigs()); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); connect.assertions().assertConnectorIsStopped( - CONNECTOR_NAME, + connectorName, "Connector did not stop in time" ); - String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME)); + String url = connect.endpointForResource(String.format("connectors/%s/offsets", connectorName)); String content = "{}"; try (Response response = connect.requestPatch(url, content)) { @@ -497,15 +534,11 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterSourceConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } @Test public void testAlterSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); alterAndVerifySourceConnectorOffsets(connectorConfigs); @@ -513,8 +546,6 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { - connect = connectBuilder.build(); - connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -532,25 +563,23 @@ public class OffsetsApiIntegrationTest { @Test public void testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception { - workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); - connect = connectBuilder.workerProps(workerProps).build(); - connect.start(); + connect = exactlyOnceSourceConnectCluster(); alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } public void alterAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception { // Create source connector - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); connect.assertions().assertConnectorIsStopped( - CONNECTOR_NAME, + connectorName, "Connector did not stop in time" ); @@ -558,63 +587,61 @@ public class OffsetsApiIntegrationTest { // The MonitorableSourceConnector has a source partition per task for (int i = 0; i < NUM_TASKS; i++) { offsetsToAlter.add( - new ConnectorOffset(Collections.singletonMap("task.id", CONNECTOR_NAME + "-" + i), + new ConnectorOffset(Collections.singletonMap("task.id", connectorName + "-" + i), Collections.singletonMap("saved", 5)) ); } - String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + String response = connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter)); assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " + "However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses.")); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 5, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 5, "Source connector offsets should reflect the altered offsets"); // Update the connector's configs; this time expect SourceConnector::alterOffsets to return true connectorConfigs.put(MonitorableSourceConnector.ALTER_OFFSETS_RESULT, "true"); - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.configureConnector(connectorName, connectorConfigs); // Alter offsets again while connector is in stopped state offsetsToAlter = new ArrayList<>(); // The MonitorableSourceConnector has a source partition per task for (int i = 0; i < NUM_TASKS; i++) { offsetsToAlter.add( - new ConnectorOffset(Collections.singletonMap("task.id", CONNECTOR_NAME + "-" + i), + new ConnectorOffset(Collections.singletonMap("task.id", connectorName + "-" + i), Collections.singletonMap("saved", 7)) ); } - response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + response = connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter)); assertThat(response, containsString("The offsets for this connector have been altered successfully")); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 7, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 7, "Source connector offsets should reflect the altered offsets"); // Resume the connector and expect its offsets to catch up to the latest offsets - connect.resumeConnector(CONNECTOR_NAME); + connect.resumeConnector(connectorName); connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( - CONNECTOR_NAME, + connectorName, NUM_TASKS, "Connector tasks did not resume in time" ); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); } @Test public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws Exception { - connect = connectBuilder.build(); - connect.start(); // Create a source connector and stop it - connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs()); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, baseSourceConnectorConfigs()); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); connect.assertions().assertConnectorIsStopped( - CONNECTOR_NAME, + connectorName, "Connector did not stop in time" ); - String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME)); + String url = connect.endpointForResource(String.format("connectors/%s/offsets", connectorName)); String content = "[]"; try (Response response = connect.requestPatch(url, content)) { @@ -667,31 +694,28 @@ public class OffsetsApiIntegrationTest { @Test public void testResetSinkConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); } @Test public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); - connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, - "overridden-group-id"); + String overriddenGroupId = connectorName + "-overridden-group-id"; + connectorConfigs.put( + ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + overriddenGroupId + ); resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); // Ensure that the overridden consumer group ID was the one actually used try (Admin admin = connect.kafka().createAdminClient()) { Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get(); - assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); - assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId()))); } } @Test public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { - connect = connectBuilder.build(); - connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -709,95 +733,89 @@ public class OffsetsApiIntegrationTest { private void resetAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { int numPartitions = 3; - kafkaCluster.createTopic(TOPIC, numPartitions); + kafkaCluster.createTopic(topic, numPartitions); // Produce records to each partition for (int partition = 0; partition < numPartitions; partition++) { for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { - kafkaCluster.produce(TOPIC, partition, "key", "value"); + kafkaCluster.produce(topic, partition, "key", "value"); } } // Create sink connector - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); connect.assertions().assertConnectorIsStopped( - CONNECTOR_NAME, + connectorName, "Connector did not stop in time" ); - // Reset the sink connector's offsets - String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + // Reset the sink connector's offsets, with retry logic (since we just stopped the connector) + String response = modifySinkConnectorOffsetsWithRetry(null); assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); - verifyEmptyConnectorOffsets(CONNECTOR_NAME); + verifyEmptyConnectorOffsets(connectorName); // Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error - response = connect.resetConnectorOffsets(CONNECTOR_NAME); + response = connect.resetConnectorOffsets(connectorName); assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); - verifyEmptyConnectorOffsets(CONNECTOR_NAME); + verifyEmptyConnectorOffsets(connectorName); // Resume the connector and expect its offsets to catch up to the latest offsets - connect.resumeConnector(CONNECTOR_NAME); + connect.resumeConnector(connectorName); connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( - CONNECTOR_NAME, + connectorName, NUM_TASKS, "Connector tasks did not resume in time" ); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); } @Test public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { - connect = connectBuilder.build(); - connect.start(); - connect.kafka().createTopic(TOPIC, 1); + connect.kafka().createTopic(topic, 1); // Produce records for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) { - connect.kafka().produce(TOPIC, 0, "key", "value"); + connect.kafka().produce(topic, 0, "key", "value"); } // Configure a sink connector whose sink task blocks in its stop method Map<String, String> connectorConfigs = new HashMap<>(); connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); - connectorConfigs.put(TOPICS_CONFIG, TOPIC); + connectorConfigs.put(TOPICS_CONFIG, topic); connectorConfigs.put("block", "Task::stop"); - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1, "Connector tasks did not start in time."); - verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, NUM_RECORDS_PER_PARTITION, + verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, NUM_RECORDS_PER_PARTITION, "Sink connector consumer group offsets should catch up to the topic end offsets"); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); // Try to reset the offsets - ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); + ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(connectorName)); assertThat(e.getMessage(), containsString("zombie sink task")); } @Test public void testResetSourceConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } @Test public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); resetAndVerifySourceConnectorOffsets(connectorConfigs); @@ -805,8 +823,6 @@ public class OffsetsApiIntegrationTest { @Test public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { - connect = connectBuilder.build(); - connect.start(); EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); try (AutoCloseable ignored = kafkaCluster::stop) { @@ -824,50 +840,48 @@ public class OffsetsApiIntegrationTest { @Test public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception { - workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); - connect = connectBuilder.workerProps(workerProps).build(); - connect.start(); + connect = exactlyOnceSourceConnectCluster(); resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs()); } public void resetAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception { // Create source connector - connect.configureConnector(CONNECTOR_NAME, connectorConfigs); - connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + connect.configureConnector(connectorName, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks did not start in time."); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); - connect.stopConnector(CONNECTOR_NAME); + connect.stopConnector(connectorName); connect.assertions().assertConnectorIsStopped( - CONNECTOR_NAME, + connectorName, "Connector did not stop in time" ); // Reset the source connector's offsets - String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + String response = connect.resetConnectorOffsets(connectorName); assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); - verifyEmptyConnectorOffsets(CONNECTOR_NAME); + verifyEmptyConnectorOffsets(connectorName); // Reset the source connector's offsets again while it is still in a STOPPED state and ensure that there is no error - response = connect.resetConnectorOffsets(CONNECTOR_NAME); + response = connect.resetConnectorOffsets(connectorName); assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); - verifyEmptyConnectorOffsets(CONNECTOR_NAME); + verifyEmptyConnectorOffsets(connectorName); // Resume the connector and expect its offsets to catch up to the latest offsets - connect.resumeConnector(CONNECTOR_NAME); + connect.resumeConnector(connectorName); connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( - CONNECTOR_NAME, + connectorName, NUM_TASKS, "Connector tasks did not resume in time" ); - verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION, + verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION, "Source connector offsets should reflect the expected number of records produced"); } @@ -875,7 +889,7 @@ public class OffsetsApiIntegrationTest { Map<String, String> configs = new HashMap<>(); configs.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName()); configs.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); - configs.put(TOPICS_CONFIG, TOPIC); + configs.put(TOPICS_CONFIG, topic); configs.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); configs.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); return configs; @@ -885,7 +899,7 @@ public class OffsetsApiIntegrationTest { Map<String, String> props = new HashMap<>(); props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); - props.put(TOPIC_CONFIG, TOPIC); + props.put(TOPIC_CONFIG, topic); props.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3"); props.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(NUM_RECORDS_PER_PARTITION)); props.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); @@ -895,6 +909,56 @@ public class OffsetsApiIntegrationTest { return props; } + /** + * Modify (i.e., alter or reset) the offsets for a sink connector, with retry logic to + * handle cases where laggy task shutdown may have left a consumer in the group. + * @param offsetsToAlter the offsets to alter for the sink connector, or null if + * the connector's offsets should be reset instead + * @return the response from the REST API, if the request was successful + * @throws InterruptedException if the thread is interrupted while waiting for a + * request to modify the connector's offsets to succeed + * @see <a href="https://issues.apache.org/jira/browse/KAFKA-15826">KAFKA-15826</a> + */ + private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets offsetsToAlter) throws InterruptedException { + // Some retry logic is necessary to account for KAFKA-15826, + // where laggy sink task startup/shutdown can leave consumers running + String modifyVerb = offsetsToAlter != null ? "alter" : "reset"; + String conditionDetails = "Failed to " + modifyVerb + " sink connector offsets in time"; + AtomicReference<String> responseReference = new AtomicReference<>(); + waitForCondition( + () -> { + try { + if (offsetsToAlter == null) { + responseReference.set(connect.resetConnectorOffsets(connectorName)); + } else { + responseReference.set(connect.alterConnectorOffsets(connectorName, offsetsToAlter)); + } + return true; + } catch (ConnectRestException e) { + boolean internalServerError = e.statusCode() == INTERNAL_SERVER_ERROR.getStatusCode(); + + String message = Optional.of(e.getMessage()).orElse(""); + boolean failedToModifyConsumerOffsets = message.contains( + "Failed to " + modifyVerb + " consumer group offsets for connector" + ); + boolean canBeRetried = message.contains("If the connector is in a stopped state, this operation can be safely retried"); + + boolean retriable = internalServerError && failedToModifyConsumerOffsets && canBeRetried; + if (retriable) { + return false; + } else { + throw new NoRetryException(e); + } + } catch (Throwable t) { + throw new NoRetryException(t); + } + }, + 30_000, + conditionDetails + ); + return responseReference.get(); + } + /** * Verify whether the actual consumer group offsets for a sink connector match the expected offsets. The verification * is done using the <strong><em>GET /connectors/{connector}/offsets</em></strong> REST API which is repeatedly queried @@ -914,7 +978,7 @@ public class OffsetsApiIntegrationTest { */ private void verifyExpectedSinkConnectorOffsets(String connectorName, String expectedTopic, int expectedPartitions, int expectedOffset, String conditionDetails) throws InterruptedException { - TestUtils.waitForCondition(() -> { + waitForCondition(() -> { ConnectorOffsets offsets = connect.connectorOffsets(connectorName); if (offsets.offsets().size() != expectedPartitions) { return false; @@ -944,14 +1008,14 @@ public class OffsetsApiIntegrationTest { */ private void verifyExpectedSourceConnectorOffsets(String connectorName, int numTasks, int expectedOffset, String conditionDetails) throws InterruptedException { - TestUtils.waitForCondition(() -> { + waitForCondition(() -> { ConnectorOffsets offsets = connect.connectorOffsets(connectorName); // The MonitorableSourceConnector has a source partition per task if (offsets.offsets().size() != numTasks) { return false; } for (ConnectorOffset offset : offsets.offsets()) { - assertTrue(((String) offset.partition().get("task.id")).startsWith(CONNECTOR_NAME)); + assertTrue(((String) offset.partition().get("task.id")).startsWith(connectorName)); if ((Integer) offset.offset().get("saved") != expectedOffset) { return false; } @@ -969,7 +1033,7 @@ public class OffsetsApiIntegrationTest { * @throws InterruptedException if the thread is interrupted while waiting for the offsets to be empty */ private void verifyEmptyConnectorOffsets(String connectorName) throws InterruptedException { - TestUtils.waitForCondition(() -> { + waitForCondition(() -> { ConnectorOffsets offsets = connect.connectorOffsets(connectorName); return offsets.offsets().isEmpty(); }, OFFSET_READ_TIMEOUT_MS, "Connector offsets should be empty after resetting offsets");