fvaleri commented on code in PR #14567:
URL: https://github.com/apache/kafka/pull/14567#discussion_r1364960461


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -266,7 +262,29 @@ private Map<TopicPartition, Long> 
loadOffsets(Set<TopicPartition> topicPartition
     private Long loadOffset(TopicPartition topicPartition) {
         Map<String, Object> wrappedPartition = 
MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias);
         Map<String, Object> wrappedOffset = 
context.offsetStorageReader().offset(wrappedPartition);
-        return MirrorUtils.unwrapOffset(wrappedOffset) + 1;
+        return MirrorUtils.unwrapOffset(wrappedOffset);
+    }
+
+    // visible for testing
+    void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
+        Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
+        consumer.assign(topicPartitionOffsets.keySet());
+        log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.values().stream()

Review Comment:
   I just thought that we already have an info level message at the end of 
`MirrorSourceTask::start` and this message was not that useful, but I don't 
have a strong opinion, so I'm fine with leaving it as it is. 



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -302,6 +320,10 @@ private static int byteSize(byte[] bytes) {
         }
     }
 
+    private boolean isUncommitted(Long offset) {
+        return offset == null || offset < 0;

Review Comment:
   Ok. Thanks for exploring that suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to