fvaleri commented on code in PR #14567: URL: https://github.com/apache/kafka/pull/14567#discussion_r1363556559
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -266,7 +262,29 @@ private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartition private Long loadOffset(TopicPartition topicPartition) { Map<String, Object> wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias); Map<String, Object> wrappedOffset = context.offsetStorageReader().offset(wrappedPartition); - return MirrorUtils.unwrapOffset(wrappedOffset) + 1; + return MirrorUtils.unwrapOffset(wrappedOffset); + } + + // visible for testing + void initializeConsumer(Set<TopicPartition> taskTopicPartitions) { + Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); + consumer.assign(topicPartitionOffsets.keySet()); + log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() Review Comment: ```suggestion log.debug("Found {} uncommitted partitions.", topicPartitionOffsets.values().stream() ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -302,6 +320,10 @@ private static int byteSize(byte[] bytes) { } } + private boolean isUncommitted(Long offset) { + return offset == null || offset < 0; Review Comment: Looks like `unwrapOffset` takes care of the null check. We can simply use the primitive type here and check equal to -1L (it can't have any other value). Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org