C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202684769


##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java:
##########
@@ -52,4 +53,82 @@ public static ConnectorOffsets 
consumerGroupOffsetsToConnectorOffsets(Map<TopicP
 
         return new ConnectorOffsets(connectorOffsets);
     }
+
+    /**
+     * Validate that the provided partitions (keys in the {@code 
partitionOffsets} map) look like:
+     * <pre>
+     *     {
+     *       "kafka_topic": "topic"
+     *       "kafka_partition": 3
+     *     }
+     * </pre>
+     *
+     * and that the provided offsets (values in the {@code partitionOffsets} 
map) look like:
+     * <pre>
+     *     {
+     *       "kafka_offset": 1000
+     *     }
+     * </pre>
+     *
+     * This method then parses them into a mapping from {@link 
TopicPartition}s to their corresponding {@link Long}
+     * valued offsets.
+     *
+     * @param partitionOffsets the partitions to offset map that needs to be 
validated and parsed.
+     * @return the parsed mapping from {@link TopicPartition} to its 
corresponding {@link Long} valued offset.
+     *
+     * @throws BadRequestException if the provided offsets aren't in the 
expected format
+     */
+    public static Map<TopicPartition, Long> 
validateAndParseSinkConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> 
partitionOffsets) {

Review Comment:
   Well done, looks great 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to