fvaleri commented on code in PR #14567:
URL: https://github.com/apache/kafka/pull/14567#discussion_r1363556559


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -266,7 +262,29 @@ private Map<TopicPartition, Long> 
loadOffsets(Set<TopicPartition> topicPartition
     private Long loadOffset(TopicPartition topicPartition) {
         Map<String, Object> wrappedPartition = 
MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias);
         Map<String, Object> wrappedOffset = 
context.offsetStorageReader().offset(wrappedPartition);
-        return MirrorUtils.unwrapOffset(wrappedOffset) + 1;
+        return MirrorUtils.unwrapOffset(wrappedOffset);
+    }
+
+    // visible for testing
+    void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
+        Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
+        consumer.assign(topicPartitionOffsets.keySet());
+        log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.values().stream()

Review Comment:
   ```suggestion
           log.debug("Found {} uncommitted partitions.", 
topicPartitionOffsets.values().stream()
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -302,6 +320,10 @@ private static int byteSize(byte[] bytes) {
         }
     }
 
+    private boolean isUncommitted(Long offset) {
+        return offset == null || offset < 0;

Review Comment:
   Looks like `unwrapOffset` takes care of the null check. We can simply use 
the primitive type here and check equal to -1L (it can't have any other value). 
Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to