This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 10775fefc28 Provide kafka unbounded reader to checkpoint mark when
offset based deduplication is supported. (#34669)
10775fefc28 is described below
commit 10775fefc28378fd69a0f8cd76c19d9adae5a96f
Author: Tom Stepp <[email protected]>
AuthorDate: Fri Apr 18 21:41:52 2025 -0700
Provide kafka unbounded reader to checkpoint mark when offset based
deduplication is supported. (#34669)
---
.../main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java | 3 +--
.../main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 4 +++-
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 4271d6f72a0..1ef0f42bf8e 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -35,6 +35,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
*/
@DefaultCoder(AvroCoder.class)
public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
+ private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
private List<PartitionMark> partitions;
@@ -44,8 +45,6 @@ public class KafkaCheckpointMark implements
UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking
abstraction
private KafkaCheckpointMark() {} // for Avro
- private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
-
public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>>
reader) {
this.partitions = partitions;
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 074bba54ac2..4680304d5a6 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -268,7 +268,9 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
p.nextOffset,
p.lastWatermark.getMillis()))
.collect(Collectors.toList()),
- source.getSpec().isCommitOffsetsInFinalizeEnabled() ?
Optional.of(this) : Optional.empty());
+ source.getSpec().isCommitOffsetsInFinalizeEnabled() ||
offsetBasedDeduplicationSupported()
+ ? Optional.of(this)
+ : Optional.empty());
}
@Override