yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1153487072
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java: ########## @@ -109,8 +124,82 @@ public void testThreadName() { .newThread(EMPTY_RUNNABLE).getName().startsWith(FileOffsetBackingStore.class.getSimpleName())); } + @Test + public void testConnectorPartitions() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> setCallback = mock(Callback.class); + + // This test actually requires the offset store to track deserialized source partitions, so we can't use the member variable mock converter + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), true); + + Map<ByteBuffer, ByteBuffer> serializedPartitionOffsets = new HashMap<>(); + serializedPartitionOffsets.put( + serializeKey(jsonConverter, "connector1", Collections.singletonMap("partitionKey", "partitionValue1")), + serialize(jsonConverter, Collections.singletonMap("offsetKey", "offsetValue")) + ); + store.set(serializedPartitionOffsets, setCallback).get(); + + serializedPartitionOffsets.put( + serializeKey(jsonConverter, "connector1", Collections.singletonMap("partitionKey", "partitionValue1")), + serialize(jsonConverter, Collections.singletonMap("offsetKey", "offsetValue2")) + ); + serializedPartitionOffsets.put( + serializeKey(jsonConverter, "connector1", Collections.singletonMap("partitionKey", "partitionValue2")), + serialize(jsonConverter, Collections.singletonMap("offsetKey", "offsetValue")) + ); + serializedPartitionOffsets.put( + serializeKey(jsonConverter, "connector2", Collections.singletonMap("partitionKey", "partitionValue")), + serialize(jsonConverter, Collections.singletonMap("offsetKey", "offsetValue")) + ); + + store.set(serializedPartitionOffsets, setCallback).get(); + store.stop(); + + // Restore into a new store to ensure correct reload from scratch + FileOffsetBackingStore restore = new FileOffsetBackingStore(jsonConverter); + restore.configure(config); + restore.start(); + + Set<Map<String, Object>> connectorPartitions1 = restore.connectorPartitions("connector1"); + assertEquals(2, connectorPartitions1.size()); Review Comment: Yep, sorry, forgot to clean it up 🤦 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org