This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 51ee89a598b474108363bdb9677842eb21148a82 Author: csolidum <[email protected]> AuthorDate: Wed Jan 4 03:02:52 2023 -0800 KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052) Reviewers: Mickael Maison <[email protected]>, Greg Harris <[email protected]> --- .../apache/kafka/connect/mirror/MirrorCheckpointTask.java | 14 ++++++++------ .../kafka/connect/mirror/MirrorCheckpointTaskTest.java | 10 ++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 959961812ea..29483c126b3 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -188,14 +188,16 @@ public class MirrorCheckpointTask extends SourceTask { Optional<Checkpoint> checkpoint(String group, TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) { - long upstreamOffset = offsetAndMetadata.offset(); - OptionalLong downstreamOffset = offsetSyncStore.translateDownstream(topicPartition, upstreamOffset); - if (downstreamOffset.isPresent()) { - return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition), + if (offsetAndMetadata != null) { + long upstreamOffset = offsetAndMetadata.offset(); + OptionalLong downstreamOffset = + offsetSyncStore.translateDownstream(topicPartition, upstreamOffset); + if (downstreamOffset.isPresent()) { + return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition), upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata())); - } else { - return Optional.empty(); + } } + return Optional.empty(); } SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index 54fe678e73a..20735cd2334 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -161,4 +161,14 @@ public class MirrorCheckpointTaskTest { assertFalse(checkpoint1.isPresent()); assertTrue(checkpoint2.isPresent()); } + + @Test + public void testNoCheckpointForTopicWithNullOffsetAndMetadata() { + OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); + MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", + new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap()); + offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L); + Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null); + assertFalse(checkpoint.isPresent()); + } }
