C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1153393915


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,116 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            Connector connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Fetching offsets for sink connector: {}", connName);
+                sinkConnectorOffsets(connName, connector, connectorConfig, cb);
+            } else {
+                log.debug("Fetching offsets for source connector: {}", 
connName);
+                sourceConnectorOffsets(connName, connector, connectorConfig, 
cb);
+            }
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SINK);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
+                    .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
+            
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result,
 error) -> {
+                if (error != null) {
+                    log.error("Failed to retrieve consumer group offsets for 
sink connector {}", connName, error);
+                    cb.onCompletion(new ConnectException("Failed to retrieve 
consumer group offsets for sink connector " + connName, error), null);
+                } else {
+                    ConnectorOffsets offsets = 
SinkUtils.consumerGroupOffsetsToConnectorOffsets(result);
+                    cb.onCompletion(null, offsets);
+                }
+                Utils.closeQuietly(admin, "Offset fetch admin for sink 
connector " + connName);
+            });
+        } catch (Exception e) {
+            Utils.closeQuietly(admin, "Offset fetch admin for sink connector " 
+ connName);
+            cb.onCompletion(new ConnectException("Failed to retrieve consumer 
group offsets for sink connector " + connName, e), null);
+        }
+    }
+
+    /**
+     * Get the current offsets for a source connector.
+     * @param connName the name of the source connector whose offsets are to 
be retrieved
+     * @param connector the source connector
+     * @param connectorConfig the source connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sourceConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                        Callback<ConnectorOffsets> cb) {
+        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        ConnectorOffsetBackingStore offsetStore = 
config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, 
connName, connector)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, 
connector);
+        CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, 
internalValueConverter);
+        sourceConnectorOffsets(connName, offsetStore, offsetReader, cb);
+    }
+
+    // Visible for testing
+    void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore 
offsetStore,
+                                CloseableOffsetStorageReader offsetReader, 
Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            try {
+                offsetStore.configure(config);
+                offsetStore.start();
+                Set<Map<String, Object>> connectorPartitions = 
offsetStore.connectorPartitions(connName);
+                List<ConnectorOffset> connectorOffsets = 
offsetReader.offsets(connectorPartitions).entrySet().stream()
+                        .map(entry -> new ConnectorOffset(entry.getKey(), 
entry.getValue()))
+                        .collect(Collectors.toList());
+                cb.onCompletion(null, new ConnectorOffsets(connectorOffsets));
+            } catch (Exception e) {

Review Comment:
   We should catch `Throwable` here, otherwise the callback could be left 
hanging.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,116 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            Connector connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Fetching offsets for sink connector: {}", connName);
+                sinkConnectorOffsets(connName, connector, connectorConfig, cb);
+            } else {
+                log.debug("Fetching offsets for source connector: {}", 
connName);
+                sourceConnectorOffsets(connName, connector, connectorConfig, 
cb);
+            }
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SINK);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
+                    .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
+            
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result,
 error) -> {
+                if (error != null) {
+                    log.error("Failed to retrieve consumer group offsets for 
sink connector {}", connName, error);
+                    cb.onCompletion(new ConnectException("Failed to retrieve 
consumer group offsets for sink connector " + connName, error), null);
+                } else {
+                    ConnectorOffsets offsets = 
SinkUtils.consumerGroupOffsetsToConnectorOffsets(result);
+                    cb.onCompletion(null, offsets);
+                }
+                Utils.closeQuietly(admin, "Offset fetch admin for sink 
connector " + connName);
+            });
+        } catch (Exception e) {

Review Comment:
   Should be `Throwable`, otherwise we may leak the admin client.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java:
##########
@@ -109,8 +124,82 @@ public void testThreadName() {
                 
.newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName()));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = mock(Callback.class);
+
+        // This test actually requires the offset store to track deserialized 
source partitions, so we can't use the member variable mock converter
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+
+        Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new 
HashMap<>();
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+        store.set(serializedPartitionOffsets, setCallback).get();
+
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue2"))
+        );
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue2")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector2", 
Collections.singletonMap("partitionKey", "partitionValue")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+
+        store.set(serializedPartitionOffsets, setCallback).get();
+        store.stop();
+
+        // Restore into a new store to ensure correct reload from scratch
+        FileOffsetBackingStore restore = new 
FileOffsetBackingStore(jsonConverter);
+        restore.configure(config);
+        restore.start();
+
+        Set<Map<String, Object>> connectorPartitions1 = 
restore.connectorPartitions("connector1");
+        assertEquals(2, connectorPartitions1.size());
+
+        Set<Map<String, Object>> expectedConnectorPartition1 = new HashSet<>();
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue1"));
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue2"));
+        assertEquals(expectedConnectorPartition1, connectorPartitions1);
+
+        Set<Map<String, Object>> connectorPartitions2 = 
restore.connectorPartitions("connector2");
+        assertEquals(1, connectorPartitions2.size());
+
+        Set<Map<String, Object>> expectedConnectorPartition2 = 
Collections.singleton(Collections.singletonMap("partitionKey", 
"partitionValue"));
+        assertEquals(expectedConnectorPartition2, connectorPartitions2);
+
+        serializedPartitionOffsets.clear();
+        // Null valued offset for a partition key should remove that partition 
for the connector
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")),
+                null
+        );
+        restore.set(serializedPartitionOffsets, setCallback).get();
+        connectorPartitions1 = restore.connectorPartitions("connector1");
+        assertEquals(1, connectorPartitions1.size());

Review Comment:
   Should do a comparison against an expected set of connector partitions here 
instead of just the size



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -460,6 +471,70 @@ public void testClientIds() {
         assertEquals(expectedClientId, 
capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+        store = spy(new KafkaOffsetBackingStore(() -> {
+            fail("Should not attempt to instantiate admin in these tests");
+            return null;
+        }, () -> CLIENT_ID_BASE, jsonConverter));
+
+        
doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), 
capturedProducerProps.capture(),
+                capturedConsumerProps.capture(), 
capturedConsumedCallback.capture(),
+                capturedNewTopic.capture(), capturedAdminSupplier.capture());
+
+        store.configure(mockConfig(props));
+        store.start();
+
+        verify(storeLog).start();
+
+        doAnswer(invocation -> {
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue1"))), TP0_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue1"))), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue2"))), TP2_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector2",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue"))), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
+            return null;
+        }).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());
+
+        // Trigger a read to the end of the log
+        store.get(Collections.emptyList()).get(10000, TimeUnit.MILLISECONDS);
+
+        Set<Map<String, Object>> connectorPartitions1 = 
store.connectorPartitions("connector1");
+        assertEquals(2, connectorPartitions1.size());
+
+        Set<Map<String, Object>> expectedConnectorPartition1 = new HashSet<>();
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue1"));
+        
expectedConnectorPartition1.add(Collections.singletonMap("partitionKey", 
"partitionValue2"));
+        assertEquals(expectedConnectorPartition1, connectorPartitions1);
+
+        Set<Map<String, Object>> connectorPartitions2 = 
store.connectorPartitions("connector2");
+        assertEquals(1, connectorPartitions2.size());

Review Comment:
   (Think this can/should be removed)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {

Review Comment:
   Ah yeah, forgot that we don't forward to the leader for this.
   
   In that case, I think we should go back to using a snapshot-based approach 
in the `AbstractHerder::connectorOffsets` method, which will guarantee a 
consistent view of the config topic for the remainder of the method:
   ```java
       @Override
       public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
           log.trace("Submitting offset fetch request for connector: {}", 
connName);
           ClusterConfigState snapshot = configBackingStore.snapshot();
           try {
               if (!snapshot.contains(connName)) {
                   cb.onCompletion(new NotFoundException("Connector " + 
connName + " not found"), null);
                   return;
               }
               // The worker asynchronously processes the request and completes 
the passed callback when done
               worker.connectorOffsets(connName, 
snapshot.connectorConfig(connName), cb);
           } catch (Throwable t) {
               cb.onCompletion(t, null);
           }
       }
   ```
   
   I'd also still recommend doing it on the tick thread for the distributed 
herder just to avoid any potential concurrency headaches; the operations we 
perform between before things get delegated to another thread by the `Worker` 
class are lightweight enough that there shouldn't be concerns about 
unnecessarily blocking the herder's tick thread.
   
   RE the "submitting request" language: you're not wrong, but so far we've 
used that language exclusively when submitting requests to the herder queue, 
and it'll be nice if we can tweak the language in other places to preserve that 
distinction.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -460,6 +471,70 @@ public void testClientIds() {
         assertEquals(expectedClientId, 
capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+        store = spy(new KafkaOffsetBackingStore(() -> {
+            fail("Should not attempt to instantiate admin in these tests");
+            return null;
+        }, () -> CLIENT_ID_BASE, jsonConverter));
+
+        
doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), 
capturedProducerProps.capture(),
+                capturedConsumerProps.capture(), 
capturedConsumedCallback.capture(),
+                capturedNewTopic.capture(), capturedAdminSupplier.capture());
+
+        store.configure(mockConfig(props));
+        store.start();
+
+        verify(storeLog).start();
+
+        doAnswer(invocation -> {
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue1"))), TP0_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue1"))), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector1",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue2"))), TP2_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0,
+                            jsonConverter.fromConnectData("", null, 
Arrays.asList("connector2",
+                                    Collections.singletonMap("partitionKey", 
"partitionValue"))), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
+            return null;
+        }).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());
+
+        // Trigger a read to the end of the log
+        store.get(Collections.emptyList()).get(10000, TimeUnit.MILLISECONDS);
+
+        Set<Map<String, Object>> connectorPartitions1 = 
store.connectorPartitions("connector1");
+        assertEquals(2, connectorPartitions1.size());

Review Comment:
   (Think this can/should be removed)



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java:
##########
@@ -109,8 +124,82 @@ public void testThreadName() {
                 
.newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName()));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = mock(Callback.class);
+
+        // This test actually requires the offset store to track deserialized 
source partitions, so we can't use the member variable mock converter
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+
+        Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new 
HashMap<>();
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+        store.set(serializedPartitionOffsets, setCallback).get();
+
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue2"))
+        );
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue2")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector2", 
Collections.singletonMap("partitionKey", "partitionValue")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+
+        store.set(serializedPartitionOffsets, setCallback).get();
+        store.stop();
+
+        // Restore into a new store to ensure correct reload from scratch
+        FileOffsetBackingStore restore = new 
FileOffsetBackingStore(jsonConverter);
+        restore.configure(config);
+        restore.start();
+
+        Set<Map<String, Object>> connectorPartitions1 = 
restore.connectorPartitions("connector1");
+        assertEquals(2, connectorPartitions1.size());

Review Comment:
   This is unnecessary now, right? If so, we should remove it since the later 
`assertEquals(expectedConnectorPartition1, connectorPartitions1);` will come 
with a more-detailed error message if it fails.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -460,6 +471,70 @@ public void testClientIds() {
         assertEquals(expectedClientId, 
capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {

Review Comment:
   Should we augment this test to cover handling of tombstone records, similar 
to what we did with the `FileOffsetBackingStoreTest::testConnectorPartitions` 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to