yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1152720712
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); Review Comment: Whoops, thanks, that was a remnant from before we wrapped this whole section with the thread context classloader switch. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions); + listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> { + if (error != null) { + cb.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, error), null); Review Comment: Thanks, that's a good point. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { Review Comment: > I think we may want to put this through the tick thread in the DistributedHerder, to ensure that the connector isn't deleted between when we check to see that it exists in the config backing store and when we try to get its config Hm wouldn't we still be susceptible to that if the request is served on a non-leader worker (since we aren't forwarding get offset requests to the leader)? And isn't that acceptable for a GET API? > We might actually want to tweak the message in the AbstractHerder class to not use the "Submitting... request" language since it's more like we're actually fulfilling the request at that point. I think the submitting request language can still be used because the worker API being called isn't synchronous either? ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java: ########## @@ -109,6 +121,69 @@ public void testThreadName() { .newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName())); } + @Test + public void testConnectorPartitions() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> setCallback = mock(Callback.class); + + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), true); + + Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new HashMap<>(); + serializedPartitionOffsets.put( + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue1")))), + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Collections.singletonMap("offsetKey", "offsetValue"))) + ); + serializedPartitionOffsets.put( + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue1")))), + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Collections.singletonMap("offsetKey", "offsetValue2"))) + ); + serializedPartitionOffsets.put( + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue2")))), + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Collections.singletonMap("offsetKey", "offsetValue"))) + ); + serializedPartitionOffsets.put( + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Arrays.asList("connector2", Collections.singletonMap("partitionKey", "partitionValue")))), + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Collections.singletonMap("offsetKey", "offsetValue"))) + ); + + store.set(serializedPartitionOffsets, setCallback).get(); + store.stop(); + + // Restore into a new store to ensure correct reload from scratch + FileOffsetBackingStore restore = new FileOffsetBackingStore(jsonConverter); + restore.configure(config); + restore.start(); + + Set<Map<String, Object>> connectorPartitions1 = restore.connectorPartitions("connector1"); + assertEquals(2, connectorPartitions1.size()); + + Set<Map<String, Object>> expectedConnectorPartition1 = new HashSet<>(); + expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", "partitionValue1")); + expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", "partitionValue2")); + + assertTrue(connectorPartitions1.containsAll(expectedConnectorPartition1)); + assertTrue(expectedConnectorPartition1.containsAll(connectorPartitions1)); + + Set<Map<String, Object>> connectorPartitions2 = restore.connectorPartitions("connector2"); + assertEquals(1, connectorPartitions2.size()); + + Set<Map<String, Object>> expectedConnectorPartition2 = Collections.singleton(Collections.singletonMap("partitionKey", "partitionValue")); + + assertTrue(connectorPartitions2.containsAll(expectedConnectorPartition2)); + assertTrue(expectedConnectorPartition2.containsAll(connectorPartitions2)); Review Comment: Makes sense, added. This also made me realize that there was a bug in the `FileOffsetBackingStore` implementation for `connectorPartitions`. We were only populating the `connectorPartitions` map in the `load` method which only happens once during startup; we should also be populating it on calls to `save` which gets called on every `OffsetBackingStore::set` (this wasn't an issue in the `KafkaOffsetBackingStore` where the `connectorPartitions` map is populated in the separate thread consuming from the `KafkaBasedLog`). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); Review Comment: Hmm, in theory there shouldn't be any exceptions thrown in the remainder of the method and the admin client is closed at the end of the `whenComplete` chained to the returned future. I'm not using a try-with-resources or a finally block here because we don't want to close the admin client before the `listConsumerGroupOffsets` request completes but I guess it doesn't hurt to wrap this in a catch all try block (also since we're undoing the earlier suggestion of centralizing the try/catch logic, this looks okay). ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.SourceConnectorConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; +import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for Kafka Connect's connector offset management REST APIs + */ +@Category(IntegrationTest.class) +public class OffsetsApiIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final Integer NUM_TASKS = 2; + private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); + private static final int NUM_WORKERS = 3; + private EmbeddedConnectCluster connect; + + @Before + public void setup() { + // setup Connect worker properties + Map<String, String> workerProps = new HashMap<>(); + workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); + + // build a Connect cluster backed by Kafka and Zk + connect = new EmbeddedConnectCluster.Builder() + .name("connect-cluster") + .numWorkers(NUM_WORKERS) + .workerProps(workerProps) + .build(); + connect.start(); + } + + @After + public void tearDown() { + connect.stop(); + } + + @Test + public void testGetNonExistentConnectorOffsets() { + ConnectRestException e = assertThrows(ConnectRestException.class, + () -> connect.connectorOffsets("non-existent-connector")); + assertEquals(404, e.errorCode()); + } + + @Test + public void testGetSinkConnectorOffsets() throws Exception { + getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); + } + + @Test + public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, Review Comment: Hm I get where you're coming from but I feel like adding anything too involved here would be detracting from this test and actually testing a different part of Connect (client config overrides). So I've just added a simple admin client based check after the call to `getAndVerifySinkConnectorOffsets` to ensure that the overridden group ID exists and the default one doesn't. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java: ########## @@ -1722,6 +1727,128 @@ public void testZombieFencing() { verifyGenericIsolation(); } + @Test + @SuppressWarnings("unchecked") + public void testGetSinkConnectorOffsets() throws Exception { + mockKafkaClusterId(); + + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, + allConnectorClientConfigOverridePolicy); + worker.start(); + + Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = + Collections.singletonMap(new TopicPartition("test-topic", 0), new OffsetAndMetadata(10)); + Map<String, Map<TopicPartition, OffsetAndMetadata>> consumerGroupToOffsetsMap = + Collections.singletonMap(SinkUtils.consumerGroupId(CONNECTOR_ID), consumerGroupOffsets); + + Admin admin = mock(Admin.class); + ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class); + when(admin.listConsumerGroupOffsets(anyString(), any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result); + KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> adminFuture = mock(KafkaFuture.class); + when(result.all()).thenReturn(adminFuture); + when(adminFuture.whenComplete(any())).thenAnswer(invocation -> { + ((KafkaFuture.BiConsumer<Map<String, Map<TopicPartition, OffsetAndMetadata>>, Throwable>) invocation.getArgument(0)) + .accept(consumerGroupToOffsetsMap, null); + return null; + }); + + FutureCallback<ConnectorOffsets> cb = new FutureCallback<>(); + worker.sinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, cb, config -> admin); + ConnectorOffsets offsets = cb.get(1000, TimeUnit.MILLISECONDS); + + assertEquals(1, offsets.offsets().size()); + assertEquals(10L, offsets.offsets().get(0).offset().get(SinkUtils.KAFKA_OFFSET_KEY)); + assertEquals(0, offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_PARTITION_KEY)); + assertEquals("test-topic", offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_TOPIC_KEY)); + + verify(admin).listConsumerGroupOffsets(eq(SinkUtils.consumerGroupId(CONNECTOR_ID)), any(ListConsumerGroupOffsetsOptions.class)); + verify(admin).close(); + verifyKafkaClusterId(); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + mockKafkaClusterId(); + + String connectorClass = SampleSinkConnector.class.getName(); + connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); + connectorProps.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, "overridden-group-id"); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, + allConnectorClientConfigOverridePolicy); + worker.start(); + + Map<String, Map<TopicPartition, OffsetAndMetadata>> consumerGroupToOffsetsMap = new HashMap<>(); + consumerGroupToOffsetsMap.put("overridden-group-id", Collections.singletonMap(new TopicPartition("test-topic", 0), new OffsetAndMetadata(10))); + consumerGroupToOffsetsMap.put(SinkUtils.consumerGroupId(CONNECTOR_ID), + Collections.singletonMap(new TopicPartition("test-topic-2", 1), new OffsetAndMetadata(0))); + + Admin admin = mock(Admin.class); + ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class); + when(admin.listConsumerGroupOffsets(anyString(), any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result); + KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> adminFuture = mock(KafkaFuture.class); + when(result.all()).thenReturn(adminFuture); + when(adminFuture.whenComplete(any())).thenAnswer(invocation -> { + ((KafkaFuture.BiConsumer<Map<String, Map<TopicPartition, OffsetAndMetadata>>, Throwable>) invocation.getArgument(0)) + .accept(consumerGroupToOffsetsMap, null); + return null; + }); + + FutureCallback<ConnectorOffsets> cb = new FutureCallback<>(); + worker.sinkConnectorOffsets(CONNECTOR_ID, sinkConnector, connectorProps, cb, config -> admin); + ConnectorOffsets offsets = cb.get(1000, TimeUnit.MILLISECONDS); + + assertEquals(1, offsets.offsets().size()); + assertEquals(10L, offsets.offsets().get(0).offset().get(SinkUtils.KAFKA_OFFSET_KEY)); + assertEquals(0, offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_PARTITION_KEY)); + assertEquals("test-topic", offsets.offsets().get(0).partition().get(SinkUtils.KAFKA_TOPIC_KEY)); + + verify(admin).listConsumerGroupOffsets(eq("overridden-group-id"), any(ListConsumerGroupOffsetsOptions.class)); + verify(admin).close(); + verifyKafkaClusterId(); + } + + @Test + public void testGetSourceConnectorOffsets() throws Exception { Review Comment: I've added a test case where `offsetStore::connectorPartitions` throws an exception and verified that the store is stopped. The callback invocation is indirectly tested through the use of a `FutureCallback` in the test and making assertions on the exception thrown by `FutureCallback::get`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java: ########## @@ -460,6 +471,74 @@ public void testClientIds() { assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG)); } + @Test + public void testConnectorPartitions() throws Exception { + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), true); + store = spy(new KafkaOffsetBackingStore(() -> { + fail("Should not attempt to instantiate admin in these tests"); + return null; + }, () -> CLIENT_ID_BASE, jsonConverter)); + + doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(), + capturedConsumerProps.capture(), capturedConsumedCallback.capture(), + capturedNewTopic.capture(), capturedAdminSupplier.capture()); + + store.configure(mockConfig(props)); + store.start(); + + verify(storeLog).start(); + + doAnswer(invocation -> { + capturedConsumedCallback.getValue().onCompletion(null, + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, + jsonConverter.fromConnectData("", null, Arrays.asList("connector1", + Collections.singletonMap("partitionKey", "partitionValue1"))), TP0_VALUE.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, + new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, + jsonConverter.fromConnectData("", null, Arrays.asList("connector1", + Collections.singletonMap("partitionKey", "partitionValue1"))), TP1_VALUE.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, + new ConsumerRecord<>(TOPIC, 2, 0, 0L, TimestampType.CREATE_TIME, 0, 0, + jsonConverter.fromConnectData("", null, Arrays.asList("connector1", + Collections.singletonMap("partitionKey", "partitionValue2"))), TP2_VALUE.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, + new ConsumerRecord<>(TOPIC, 3, 0, 0L, TimestampType.CREATE_TIME, 0, 0, + jsonConverter.fromConnectData("", null, Arrays.asList("connector2", + Collections.singletonMap("partitionKey", "partitionValue"))), TP1_VALUE.array(), + new RecordHeaders(), Optional.empty())); + storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null); + return null; + }).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture()); + + // Trigger a read to the end of the log + store.get(Collections.emptyList()).get(10000, TimeUnit.MILLISECONDS); + + Set<Map<String, Object>> connectorPartitions1 = store.connectorPartitions("connector1"); + assertEquals(2, connectorPartitions1.size()); + + Set<Map<String, Object>> expectedConnectorPartition1 = new HashSet<>(); + expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", "partitionValue1")); + expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", "partitionValue2")); + + assertTrue(connectorPartitions1.containsAll(expectedConnectorPartition1)); + assertTrue(expectedConnectorPartition1.containsAll(connectorPartitions1)); Review Comment: Ah nope, leftover from earlier where I was using a List rather than a Set and we didn't care about the order. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java: ########## @@ -109,6 +121,69 @@ public void testThreadName() { .newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName())); } + @Test + public void testConnectorPartitions() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> setCallback = mock(Callback.class); + + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), true); + + Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new HashMap<>(); + serializedPartitionOffsets.put( + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue1")))), + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Collections.singletonMap("offsetKey", "offsetValue"))) + ); + serializedPartitionOffsets.put( Review Comment: Whoops yeah, the intention was to test the case where new offsets are written for partitions that are already known and ensure that the number of partitions doesn't increase. I've split it into separate `OffsetBackingStore::set` calls. ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java: ########## @@ -191,12 +194,16 @@ public void start(Map<String, String> props) { @Override public List<SourceRecord> poll() { if (!stopped) { + // Don't return any more records since we've already produced the configured maximum number. + if (seqno >= maxMessages) { + return null; + } if (throttler.shouldThrottle(seqno - startingSeqno, System.currentTimeMillis())) { throttler.throttle(); } taskHandle.record(batchSize); Review Comment: Good suggestion, I've moved it to a new variable called `currentBatchSize`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java: ########## @@ -109,6 +121,69 @@ public void testThreadName() { .newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName())); } + @Test + public void testConnectorPartitions() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> setCallback = mock(Callback.class); + + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), true); + + Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new HashMap<>(); + serializedPartitionOffsets.put( + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Arrays.asList("connector1", Collections.singletonMap("partitionKey", "partitionValue1")))), + ByteBuffer.wrap(jsonConverter.fromConnectData("", null, + Collections.singletonMap("offsetKey", "offsetValue"))) Review Comment: Nice 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org