yashmayya commented on code in PR #14003: URL: https://github.com/apache/kafka/pull/14003#discussion_r1263263703
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri })); } + /** + * "Normalize" source connector offsets by serializing and deserializing them using the internal {@link JsonConverter}. + * This is done in order to prevent type mismatches between the offsets passed to {@link SourceConnector#alterOffsets(Map, Map)} + * and the offsets that connectors and tasks retrieve via an instance of {@link OffsetStorageReader}. + * <p> + * Visible for testing. + * + * @param originalOffsets the offsets that are to be normalized + * @return the normalized offsets + */ + @SuppressWarnings("unchecked") + Map<Map<String, ?>, Map<String, ?>> normalizeSourceConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> originalOffsets) { + Map<Map<String, ?>, Map<String, ?>> normalizedOffsets = new HashMap<>(); + for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : originalOffsets.entrySet()) { + OffsetUtils.validateFormat(entry.getKey()); + OffsetUtils.validateFormat(entry.getValue()); + byte[] serializedKey = internalKeyConverter.fromConnectData("", null, entry.getKey()); + byte[] serializedValue = internalKeyConverter.fromConnectData("", null, entry.getValue()); Review Comment: Whoops, thanks. The `JsonConverter` doesn't seem to care whether it's working on keys or values (also doesn't seem to care about the topic name) but this does make sense for consistency's sake 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri })); } + /** + * "Normalize" source connector offsets by serializing and deserializing them using the internal {@link JsonConverter}. + * This is done in order to prevent type mismatches between the offsets passed to {@link SourceConnector#alterOffsets(Map, Map)} + * and the offsets that connectors and tasks retrieve via an instance of {@link OffsetStorageReader}. + * <p> + * Visible for testing. + * + * @param originalOffsets the offsets that are to be normalized + * @return the normalized offsets + */ + @SuppressWarnings("unchecked") + Map<Map<String, ?>, Map<String, ?>> normalizeSourceConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> originalOffsets) { + Map<Map<String, ?>, Map<String, ?>> normalizedOffsets = new HashMap<>(); + for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : originalOffsets.entrySet()) { + OffsetUtils.validateFormat(entry.getKey()); + OffsetUtils.validateFormat(entry.getValue()); + byte[] serializedKey = internalKeyConverter.fromConnectData("", null, entry.getKey()); + byte[] serializedValue = internalKeyConverter.fromConnectData("", null, entry.getValue()); Review Comment: Whoops, thanks. The `JsonConverter` doesn't seem to care whether it's working on keys or values (also doesn't seem to care about the topic name) but this does make sense for consistency's sake 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org