C0urante commented on code in PR #13905: URL: https://github.com/apache/kafka/pull/13905#discussion_r1247968250
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ########## @@ -31,25 +31,36 @@ import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.internal.stubbing.answers.CallsRealMethods; +import org.mockito.internal.util.collections.Sets; Review Comment: Nit: probably better to avoid depending on internal packages if we can avoid it. Left some suggestions on how to do this without much additional work below ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -106,13 +107,28 @@ public void start(Map<String, String> props) { Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); consumer.assign(topicPartitionOffsets.keySet()); log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() - .filter(x -> x.getValue() == 0L).count()); - log.trace("Seeking offsets: {}", topicPartitionOffsets); - topicPartitionOffsets.forEach(consumer::seek); + .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count()); Review Comment: Nit: we can clean this up if we use the `isUncommitted` method ```suggestion log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() .filter(this::isUncommitted).count()); ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ########## @@ -214,6 +225,82 @@ public void testPoll() { } } + @Test + public void testSeekBehaviorDuringStart() { + // Setting up mock behavior. + @SuppressWarnings("unchecked") + KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class); + + @SuppressWarnings("unchecked") + KafkaProducer<byte[], byte[]> mockProducer = mock(KafkaProducer.class); + + String sourceClusterName = "sourceCluster"; + MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class); + + SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class); + OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class); + when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader); + + MockedStatic<MirrorUtils> mockMirrorUtils = mockStatic(MirrorUtils.class, new CallsRealMethods()); + mockMirrorUtils.when(() -> MirrorUtils.newConsumer(anyMap())).thenReturn(mockConsumer); + mockMirrorUtils.when(() -> MirrorUtils.newProducer(anyMap())).thenReturn(mockProducer); + + Set<TopicPartition> topicPartitions = Sets.newSet( Review Comment: ```suggestion Set<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList( ``` (will also require a couple new imports and an additional trailing parenthesis) ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -106,13 +107,28 @@ public void start(Map<String, String> props) { Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); consumer.assign(topicPartitionOffsets.keySet()); log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() - .filter(x -> x.getValue() == 0L).count()); - log.trace("Seeking offsets: {}", topicPartitionOffsets); - topicPartitionOffsets.forEach(consumer::seek); + .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count()); + + log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream() + .filter(topicPartitionOffset -> + topicPartitionOffset.getValue() != NON_EXISTING_OFFSET_VALUE)); + + topicPartitionOffsets.forEach(this::maybeSeek); log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } + private void maybeSeek(TopicPartition topicPartition, Long offset) { + // Do not call seek on partitions that don't have an existing offset committed. + if (offset == NON_EXISTING_OFFSET_VALUE) { Review Comment: If we use `isUncommitted`: ```suggestion if (isUncommitted(offset)) { ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -106,13 +107,28 @@ public void start(Map<String, String> props) { Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); consumer.assign(topicPartitionOffsets.keySet()); log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() - .filter(x -> x.getValue() == 0L).count()); - log.trace("Seeking offsets: {}", topicPartitionOffsets); - topicPartitionOffsets.forEach(consumer::seek); + .filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count()); + + log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream() + .filter(topicPartitionOffset -> + topicPartitionOffset.getValue() != NON_EXISTING_OFFSET_VALUE)); + + topicPartitionOffsets.forEach(this::maybeSeek); Review Comment: Perhaps to simplify testing and obviate the need for static mocking, we can pull out some of this logic into a separate method? I'm imagining something like this: ```java // visible for testing void initializeConsumer(Set<TopicPartition> taskTopicPartitions) { Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); consumer.assign(topicPartitionOffsets.keySet()); log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() .filter(this::isUncommitted).count()); log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream() .filter(tpo -> !isUncommitted(tpo.getValue()))); topicPartitionOffsets.forEach(this::maybeSeek); } ``` And then in `start` we would have: ```java Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions(); initializeConsumer(taskTopicPartitions); ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ########## @@ -214,6 +225,82 @@ public void testPoll() { } } + @Test + public void testSeekBehaviorDuringStart() { + // Setting up mock behavior. + @SuppressWarnings("unchecked") + KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class); + + @SuppressWarnings("unchecked") + KafkaProducer<byte[], byte[]> mockProducer = mock(KafkaProducer.class); + + String sourceClusterName = "sourceCluster"; + MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class); + + SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class); + OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class); + when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader); + + MockedStatic<MirrorUtils> mockMirrorUtils = mockStatic(MirrorUtils.class, new CallsRealMethods()); Review Comment: I don't think we'll need this if we adopt the isolated `initializeConsumer` method, but just in case, we can tweak this to use official API from Mockito instead of an internal class: ```suggestion MockedStatic<MirrorUtils> mockMirrorUtils = mockStatic(MirrorUtils.class, CALLS_REAL_METHODS); ``` (requires a static import for `org.mockito.Mockito.CALLS_REAL_METHODS`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org