C0urante commented on code in PR #13905:
URL: https://github.com/apache/kafka/pull/13905#discussion_r1247968250


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -31,25 +31,36 @@
 import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
 import org.apache.kafka.connect.source.SourceRecord;
 
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
+import org.mockito.internal.util.collections.Sets;

Review Comment:
   Nit: probably better to avoid depending on internal packages if we can avoid 
it. Left some suggestions on how to do this without much additional work below



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -106,13 +107,28 @@ public void start(Map<String, String> props) {
         Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
         consumer.assign(topicPartitionOffsets.keySet());
         log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.entrySet().stream()
-            .filter(x -> x.getValue() == 0L).count());
-        log.trace("Seeking offsets: {}", topicPartitionOffsets);
-        topicPartitionOffsets.forEach(consumer::seek);
+            .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());

Review Comment:
   Nit: we can clean this up if we use the `isUncommitted` method
   
   ```suggestion
           log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.values().stream()
                   .filter(this::isUncommitted).count());
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -214,6 +225,82 @@ public void testPoll() {
         }
     }
 
+    @Test
+    public void testSeekBehaviorDuringStart() {
+        // Setting up mock behavior.
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> mockProducer = mock(KafkaProducer.class);
+
+        String sourceClusterName = "sourceCluster";
+        MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class);
+
+        SourceTaskContext mockSourceTaskContext = 
mock(SourceTaskContext.class);
+        OffsetStorageReader mockOffsetStorageReader = 
mock(OffsetStorageReader.class);
+        
when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader);
+
+        MockedStatic<MirrorUtils> mockMirrorUtils = 
mockStatic(MirrorUtils.class, new CallsRealMethods());
+        mockMirrorUtils.when(() -> 
MirrorUtils.newConsumer(anyMap())).thenReturn(mockConsumer);
+        mockMirrorUtils.when(() -> 
MirrorUtils.newProducer(anyMap())).thenReturn(mockProducer);
+
+        Set<TopicPartition> topicPartitions = Sets.newSet(

Review Comment:
   ```suggestion
           Set<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList(
   ```
   
   (will also require a couple new imports and an additional trailing 
parenthesis)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -106,13 +107,28 @@ public void start(Map<String, String> props) {
         Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
         consumer.assign(topicPartitionOffsets.keySet());
         log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.entrySet().stream()
-            .filter(x -> x.getValue() == 0L).count());
-        log.trace("Seeking offsets: {}", topicPartitionOffsets);
-        topicPartitionOffsets.forEach(consumer::seek);
+            .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());
+
+        log.trace("Seeking offsets: {}", 
topicPartitionOffsets.entrySet().stream()
+                .filter(topicPartitionOffset ->
+                        topicPartitionOffset.getValue() != 
NON_EXISTING_OFFSET_VALUE));
+
+        topicPartitionOffsets.forEach(this::maybeSeek);
         log.info("{} replicating {} topic-partitions {}->{}: {}.", 
Thread.currentThread().getName(),
             taskTopicPartitions.size(), sourceClusterAlias, 
config.targetClusterAlias(), taskTopicPartitions);
     }
 
+    private void maybeSeek(TopicPartition topicPartition, Long offset) {
+        // Do not call seek on partitions that don't have an existing offset 
committed.
+        if (offset == NON_EXISTING_OFFSET_VALUE) {

Review Comment:
   If we use `isUncommitted`:
   ```suggestion
           if (isUncommitted(offset)) {
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -106,13 +107,28 @@ public void start(Map<String, String> props) {
         Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
         consumer.assign(topicPartitionOffsets.keySet());
         log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.entrySet().stream()
-            .filter(x -> x.getValue() == 0L).count());
-        log.trace("Seeking offsets: {}", topicPartitionOffsets);
-        topicPartitionOffsets.forEach(consumer::seek);
+            .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count());
+
+        log.trace("Seeking offsets: {}", 
topicPartitionOffsets.entrySet().stream()
+                .filter(topicPartitionOffset ->
+                        topicPartitionOffset.getValue() != 
NON_EXISTING_OFFSET_VALUE));
+
+        topicPartitionOffsets.forEach(this::maybeSeek);

Review Comment:
   Perhaps to simplify testing and obviate the need for static mocking, we can 
pull out some of this logic into a separate method?
   
   I'm imagining something like this:
   
   ```java
   // visible for testing
   void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
       Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
       consumer.assign(topicPartitionOffsets.keySet());
       log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.values().stream()
               .filter(this::isUncommitted).count());
   
       log.trace("Seeking offsets: {}", 
topicPartitionOffsets.entrySet().stream()
               .filter(tpo -> !isUncommitted(tpo.getValue())));
   
       topicPartitionOffsets.forEach(this::maybeSeek);
   }
   ```
   
   And then in `start` we would have:
   ```java
   Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
   initializeConsumer(taskTopicPartitions);
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -214,6 +225,82 @@ public void testPoll() {
         }
     }
 
+    @Test
+    public void testSeekBehaviorDuringStart() {
+        // Setting up mock behavior.
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> mockProducer = mock(KafkaProducer.class);
+
+        String sourceClusterName = "sourceCluster";
+        MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class);
+
+        SourceTaskContext mockSourceTaskContext = 
mock(SourceTaskContext.class);
+        OffsetStorageReader mockOffsetStorageReader = 
mock(OffsetStorageReader.class);
+        
when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader);
+
+        MockedStatic<MirrorUtils> mockMirrorUtils = 
mockStatic(MirrorUtils.class, new CallsRealMethods());

Review Comment:
   I don't think we'll need this if we adopt the isolated `initializeConsumer` 
method, but just in case, we can tweak this to use official API from Mockito 
instead of an internal class:
   ```suggestion
           MockedStatic<MirrorUtils> mockMirrorUtils = 
mockStatic(MirrorUtils.class, CALLS_REAL_METHODS);
   ```
   
   (requires a static import for `org.mockito.Mockito.CALLS_REAL_METHODS`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to