This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 51ee89a598b474108363bdb9677842eb21148a82
Author: csolidum <[email protected]>
AuthorDate: Wed Jan 4 03:02:52 2023 -0800

    KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null 
OffsetAndMetadata gracefully (#13052)
    
    
    Reviewers: Mickael Maison <[email protected]>, Greg Harris 
<[email protected]>
---
 .../apache/kafka/connect/mirror/MirrorCheckpointTask.java  | 14 ++++++++------
 .../kafka/connect/mirror/MirrorCheckpointTaskTest.java     | 10 ++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 959961812ea..29483c126b3 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -188,14 +188,16 @@ public class MirrorCheckpointTask extends SourceTask {
 
     Optional<Checkpoint> checkpoint(String group, TopicPartition 
topicPartition,
                                     OffsetAndMetadata offsetAndMetadata) {
-        long upstreamOffset = offsetAndMetadata.offset();
-        OptionalLong downstreamOffset = 
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
-        if (downstreamOffset.isPresent()) {
-            return Optional.of(new Checkpoint(group, 
renameTopicPartition(topicPartition),
+        if (offsetAndMetadata != null) {
+            long upstreamOffset = offsetAndMetadata.offset();
+            OptionalLong downstreamOffset =
+                offsetSyncStore.translateDownstream(topicPartition, 
upstreamOffset);
+            if (downstreamOffset.isPresent()) {
+                return Optional.of(new Checkpoint(group, 
renameTopicPartition(topicPartition),
                     upstreamOffset, downstreamOffset.getAsLong(), 
offsetAndMetadata.metadata()));
-        } else {
-            return Optional.empty();
+            }
         }
+        return Optional.empty();
     }
 
     SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 54fe678e73a..20735cd2334 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -161,4 +161,14 @@ public class MirrorCheckpointTaskTest {
         assertFalse(checkpoint1.isPresent());
         assertTrue(checkpoint2.isPresent());
     }
+
+    @Test
+    public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
+        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new 
OffsetSyncStoreTest.FakeOffsetSyncStore();
+        MirrorCheckpointTask mirrorCheckpointTask = new 
MirrorCheckpointTask("source1", "target2",
+            new DefaultReplicationPolicy(), offsetSyncStore, 
Collections.emptyMap(), Collections.emptyMap());
+        offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
+        Optional<Checkpoint> checkpoint = 
mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
+        assertFalse(checkpoint.isPresent());
+    }
 }

Reply via email to