yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1153487072


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java:
##########
@@ -109,8 +124,82 @@ public void testThreadName() {
                 
.newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName()));
     }
 
+    @Test
+    public void testConnectorPartitions() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = mock(Callback.class);
+
+        // This test actually requires the offset store to track deserialized 
source partitions, so we can't use the member variable mock converter
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), true);
+
+        Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new 
HashMap<>();
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+        store.set(serializedPartitionOffsets, setCallback).get();
+
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue1")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue2"))
+        );
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector1", 
Collections.singletonMap("partitionKey", "partitionValue2")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+        serializedPartitionOffsets.put(
+                serializeKey(jsonConverter, "connector2", 
Collections.singletonMap("partitionKey", "partitionValue")),
+                serialize(jsonConverter, Collections.singletonMap("offsetKey", 
"offsetValue"))
+        );
+
+        store.set(serializedPartitionOffsets, setCallback).get();
+        store.stop();
+
+        // Restore into a new store to ensure correct reload from scratch
+        FileOffsetBackingStore restore = new 
FileOffsetBackingStore(jsonConverter);
+        restore.configure(config);
+        restore.start();
+
+        Set<Map<String, Object>> connectorPartitions1 = 
restore.connectorPartitions("connector1");
+        assertEquals(2, connectorPartitions1.size());

Review Comment:
   Yep, sorry, forgot to clean it up 🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to