fvaleri commented on code in PR #14567: URL: https://github.com/apache/kafka/pull/14567#discussion_r1364960461
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -266,7 +262,29 @@ private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartition private Long loadOffset(TopicPartition topicPartition) { Map<String, Object> wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias); Map<String, Object> wrappedOffset = context.offsetStorageReader().offset(wrappedPartition); - return MirrorUtils.unwrapOffset(wrappedOffset) + 1; + return MirrorUtils.unwrapOffset(wrappedOffset); + } + + // visible for testing + void initializeConsumer(Set<TopicPartition> taskTopicPartitions) { + Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); + consumer.assign(topicPartitionOffsets.keySet()); + log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() Review Comment: I just thought that we already have an info level message at the end of `MirrorSourceTask::start` and this message was not that useful, but I don't have a strong opinion, so I'm fine with leaving it as it is. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -302,6 +320,10 @@ private static int byteSize(byte[] bytes) { } } + private boolean isUncommitted(Long offset) { + return offset == null || offset < 0; Review Comment: Ok. Thanks for exploring that suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org