This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 10775fefc28 Provide kafka unbounded reader to checkpoint mark when 
offset based deduplication is supported. (#34669)
10775fefc28 is described below

commit 10775fefc28378fd69a0f8cd76c19d9adae5a96f
Author: Tom Stepp <[email protected]>
AuthorDate: Fri Apr 18 21:41:52 2025 -0700

    Provide kafka unbounded reader to checkpoint mark when offset based 
deduplication is supported. (#34669)
---
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java   | 3 +--
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java  | 4 +++-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 4271d6f72a0..1ef0f42bf8e 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -35,6 +35,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
  */
 @DefaultCoder(AvroCoder.class)
 public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
+  private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
 
   private List<PartitionMark> partitions;
 
@@ -44,8 +45,6 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark {
   @SuppressWarnings("initialization") // Avro will set the fields by breaking 
abstraction
   private KafkaCheckpointMark() {} // for Avro
 
-  private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
-
   public KafkaCheckpointMark(
       List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> 
reader) {
     this.partitions = partitions;
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 074bba54ac2..4680304d5a6 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -268,7 +268,9 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
                         p.nextOffset,
                         p.lastWatermark.getMillis()))
             .collect(Collectors.toList()),
-        source.getSpec().isCommitOffsetsInFinalizeEnabled() ? 
Optional.of(this) : Optional.empty());
+        source.getSpec().isCommitOffsetsInFinalizeEnabled() || 
offsetBasedDeduplicationSupported()
+            ? Optional.of(this)
+            : Optional.empty());
   }
 
   @Override

Reply via email to