This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8cbccc9145d Kafka source offset-based deduplication. (#33596)
8cbccc9145d is described below
commit 8cbccc9145d2d07e4b1281618a488498f2827285
Author: Tom Stepp <[email protected]>
AuthorDate: Mon Feb 3 11:48:18 2025 -0800
Kafka source offset-based deduplication. (#33596)
---
.../beam/sdk/io/kafka/KafkaCheckpointMark.java | 21 ++++++++
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 53 +++++++++++++++----
.../KafkaIOReadImplementationCompatibility.java | 1 +
.../org/apache/beam/sdk/io/kafka/KafkaIOUtils.java | 13 +++++
.../beam/sdk/io/kafka/KafkaUnboundedReader.java | 33 +++++++++++-
.../beam/sdk/io/kafka/KafkaUnboundedSource.java | 23 ++++++--
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 1 -
...KafkaIOReadImplementationCompatibilityTest.java | 15 +++++-
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 61 +++++++++++++++++-----
.../sdk/io/kafka/upgrade/KafkaIOTranslation.java | 10 ++++
.../io/kafka/upgrade/KafkaIOTranslationTest.java | 1 +
11 files changed, 200 insertions(+), 32 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 966363e41f6..4271d6f72a0 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
@@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements
UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking
abstraction
private KafkaCheckpointMark() {} // for Avro
+ private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
+
public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>>
reader) {
this.partitions = partitions;
@@ -66,6 +70,23 @@ public class KafkaCheckpointMark implements
UnboundedSource.CheckpointMark {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions)
+ '}';
}
+ @Override
+ public byte[] getOffsetLimit() {
+ if (!reader.isPresent()) {
+ throw new RuntimeException(
+ "KafkaCheckpointMark reader is not present while calling
getOffsetLimit().");
+ }
+ if (!reader.get().offsetBasedDeduplicationSupported()) {
+ throw new RuntimeException(
+ "Unexpected getOffsetLimit() called while KafkaUnboundedReader not
configured for offset deduplication.");
+ }
+
+ // KafkaUnboundedSource.split() must produce a 1:1 partition to split
ratio.
+ checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
+ PartitionMark partition = partitions.get(/* index= */ 0);
+ return
KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
+ }
+
/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint
for a single
* partition.
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cb7b3020c66..080cef5aaf0 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -717,6 +717,9 @@ public class KafkaIO {
@Pure
public abstract int getRedistributeNumKeys();
+ @Pure
+ public abstract @Nullable Boolean getOffsetDeduplication();
+
@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();
@@ -782,6 +785,8 @@ public class KafkaIO {
abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
+ abstract Builder<K, V> setOffsetDeduplication(Boolean
offsetDeduplication);
+
abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);
@@ -886,11 +891,16 @@ public class KafkaIO {
if (config.allowDuplicates != null) {
builder.setAllowDuplicates(config.allowDuplicates);
}
-
+ if (config.redistribute
+ && (config.allowDuplicates == null || !config.allowDuplicates)
+ && config.offsetDeduplication != null) {
+ builder.setOffsetDeduplication(config.offsetDeduplication);
+ }
} else {
builder.setRedistributed(false);
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
+ builder.setOffsetDeduplication(false);
}
}
@@ -959,6 +969,7 @@ public class KafkaIO {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
+ private Boolean offsetDeduplication;
public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
@@ -1015,6 +1026,10 @@ public class KafkaIO {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}
+
+ public void setOffsetDeduplication(Boolean offsetDeduplication) {
+ this.offsetDeduplication = offsetDeduplication;
+ }
}
}
@@ -1066,26 +1081,21 @@ public class KafkaIO {
* Sets redistribute transform that hints to the runner to try to
redistribute the work evenly.
*/
public Read<K, V> withRedistribute() {
- if (getRedistributeNumKeys() == 0 && isRedistributed()) {
- LOG.warn("This will create a key per record, which is sub-optimal for
most use cases.");
- }
return toBuilder().setRedistributed(true).build();
}
public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
- if (!isAllowDuplicates()) {
- LOG.warn("Setting this value without setting withRedistribute() will
have no effect.");
- }
return toBuilder().setAllowDuplicates(allowDuplicates).build();
}
public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
- checkState(
- isRedistributed(),
- "withRedistributeNumKeys is ignored if withRedistribute() is not
enabled on the transform.");
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}
+ public Read<K, V> withOffsetDeduplication(Boolean offsetDeduplication) {
+ return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
+ }
+
/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read
from. All the partitions
* from each of the matching topics are read.
@@ -1541,6 +1551,9 @@ public class KafkaIO {
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
}
+
+ checkRedistributeConfiguration();
+
warnAboutUnsafeConfigurations(input);
// Infer key/value coders if not specified explicitly
@@ -1573,6 +1586,26 @@ public class KafkaIO {
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder,
valueCoder));
}
+ private void checkRedistributeConfiguration() {
+ if (getRedistributeNumKeys() == 0 && isRedistributed()) {
+ LOG.warn(
+ "withRedistribute without withRedistributeNumKeys will create a
key per record, which is sub-optimal for most use cases.");
+ }
+ if (isAllowDuplicates()) {
+ LOG.warn("Setting this value without setting withRedistribute() will
have no effect.");
+ }
+ if (getRedistributeNumKeys() > 0) {
+ checkState(
+ isRedistributed(),
+ "withRedistributeNumKeys is ignored if withRedistribute() is not
enabled on the transform.");
+ }
+ if (getOffsetDeduplication() != null && getOffsetDeduplication()) {
+ checkState(
+ isRedistributed() && !isAllowDuplicates(),
+ "withOffsetDeduplication should only be used with withRedistribute
and withAllowDuplicates(false).");
+ }
+ }
+
private void warnAboutUnsafeConfigurations(PBegin input) {
Long checkpointingInterval =
input
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
index 457e0003705..8e3c2b6ccc3 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
@@ -137,6 +137,7 @@ class KafkaIOReadImplementationCompatibility {
return false;
}
},
+ OFFSET_DEDUPLICATION(LEGACY),
;
private final @NonNull ImmutableSet<KafkaIOReadImplementation>
supportedImplementations;
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
index 748418d1666..95f95000a58 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
@@ -19,11 +19,13 @@ package org.apache.beam.sdk.io.kafka;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.beam.sdk.transforms.SerializableFunction;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -142,4 +144,15 @@ public final class KafkaIOUtils {
return avg;
}
}
+
+ static final class OffsetBasedDeduplication {
+
+ static byte[] encodeOffset(long offset) {
+ return Longs.toByteArray(offset);
+ }
+
+ static byte[] getUniqueId(String topic, int partition, long offset) {
+ return (topic + "-" + partition + "-" +
offset).getBytes(StandardCharsets.UTF_8);
+ }
+ }
}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 02af0f7d3df..6d5b706a987 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -66,6 +66,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -299,6 +300,30 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
return curTimestamp;
}
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ if (!offsetBasedDeduplicationSupported()) {
+ // Defer result to super if offset deduplication is not supported.
+ return super.getCurrentRecordId();
+ }
+ if (curRecord == null) {
+ throw new NoSuchElementException("KafkaUnboundedReader's curRecord is
null.");
+ }
+ return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
+ curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
+ }
+
+ @Override
+ public byte[] getCurrentRecordOffset() throws NoSuchElementException {
+ if (!offsetBasedDeduplicationSupported()) {
+ throw new RuntimeException("UnboundedSource must enable offset-based
deduplication.");
+ }
+ if (curRecord == null) {
+ throw new NoSuchElementException("KafkaUnboundedReader's curRecord is
null.");
+ }
+ return
KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
+ }
+
@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
@@ -313,6 +338,10 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
return backlogBytes;
}
+ public boolean offsetBasedDeduplicationSupported() {
+ return source.offsetBasedDeduplicationSupported();
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////
private static final Logger LOG =
LoggerFactory.getLogger(KafkaUnboundedReader.class);
@@ -331,8 +360,8 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
private final String name;
private @Nullable Consumer<byte[], byte[]> consumer = null;
private final List<PartitionState<K, V>> partitionStates;
- private @Nullable KafkaRecord<K, V> curRecord = null;
- private @Nullable Instant curTimestamp = null;
+ private @MonotonicNonNull KafkaRecord<K, V> curRecord = null;
+ private @MonotonicNonNull Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch =
Collections.emptyIterator();
private @Nullable Deserializer<K> keyDeserializerInstance = null;
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 9685d859b0a..fa01af02a35 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -113,10 +113,20 @@ class KafkaUnboundedSource<K, V> extends
UnboundedSource<KafkaRecord<K, V>, Kafk
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and
topic names");
- int numSplits = Math.min(desiredNumSplits, partitions.size());
- // XXX make all splits have the same # of partitions
- while (partitions.size() % numSplits > 0) {
- ++numSplits;
+ int numSplits;
+ if (offsetBasedDeduplicationSupported()) {
+ // Enforce 1:1 split to partition ratio for offset deduplication.
+ numSplits = partitions.size();
+ LOG.info(
+ "Offset-based deduplication is enabled for KafkaUnboundedSource. "
+ + "Forcing the number of splits to equal the number of total
partitions: {}.",
+ numSplits);
+ } else {
+ numSplits = Math.min(desiredNumSplits, partitions.size());
+ // Make all splits have the same # of partitions.
+ while (partitions.size() % numSplits > 0) {
+ ++numSplits;
+ }
}
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
@@ -177,6 +187,11 @@ class KafkaUnboundedSource<K, V> extends
UnboundedSource<KafkaRecord<K, V>, Kafk
return false;
}
+ @Override
+ public boolean offsetBasedDeduplicationSupported() {
+ return spec.getOffsetDeduplication() != null &&
spec.getOffsetDeduplication();
+ }
+
@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index ae1078e25cb..808058a4482 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -145,7 +145,6 @@ public class KafkaIOExternalTest {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();
- System.out.println("xxx : " + result.toString());
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
index 29c920bf9a6..8eda52bcec9 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransform;
+import static
org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransformWithOffsetDedup;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
@@ -114,7 +115,8 @@ public class KafkaIOReadImplementationCompatibilityTest {
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
- 0)));
+ 0, /*numKeys*/
+ null /*offsetDeduplication*/)));
return p.run();
}
@@ -139,6 +141,17 @@ public class KafkaIOReadImplementationCompatibilityTest {
assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE),
containsInAnyOrder(expect));
}
+ @Test
+ public void testReadTransformCreationWithOffsetDeduplication() {
+ p.apply(mkKafkaReadTransformWithOffsetDedup(1000, new
ValueAsTimestampFn()));
+ PipelineResult r = p.run();
+ String[] expect =
+ KafkaIOTest.mkKafkaTopics.stream()
+ .map(topic -> String.format("kafka:`%s`.%s",
KafkaIOTest.mkKafkaServers, topic))
+ .toArray(String[]::new);
+ assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE),
containsInAnyOrder(expect));
+ }
+
@Test
public void testReadTransformCreationWithSdfImplementationBoundProperty() {
PipelineResult r =
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index e614320db15..6caa1868c99 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -391,7 +391,20 @@ public class KafkaIOTest {
timestampFn,
false, /*redistribute*/
false, /*allowDuplicates*/
- 0);
+ 0, /*numKeys*/
+ null /*offsetDeduplication*/);
+ }
+
+ static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithOffsetDedup(
+ int numElements, @Nullable SerializableFunction<KV<Integer, Long>,
Instant> timestampFn) {
+ return mkKafkaReadTransform(
+ numElements,
+ numElements,
+ timestampFn,
+ true, /*redistribute*/
+ false, /*allowDuplicates*/
+ 100, /*numKeys*/
+ true /*offsetDeduplication*/);
}
/**
@@ -404,7 +417,8 @@ public class KafkaIOTest {
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn,
@Nullable Boolean redistribute,
@Nullable Boolean withAllowDuplicates,
- @Nullable Integer numKeys) {
+ @Nullable Integer numKeys,
+ @Nullable Boolean offsetDeduplication) {
KafkaIO.Read<Integer, Long> reader =
KafkaIO.<Integer, Long>read()
@@ -427,15 +441,15 @@ public class KafkaIOTest {
reader = reader.withTimestampFn(timestampFn);
}
- if (redistribute) {
+ if (redistribute != null && redistribute) {
+ reader = reader.withRedistribute();
+ reader = reader.withAllowDuplicates(withAllowDuplicates);
if (numKeys != null) {
- reader =
- reader
- .withRedistribute()
- .withAllowDuplicates(withAllowDuplicates)
- .withRedistributeNumKeys(numKeys);
+ reader = reader.withRedistributeNumKeys(numKeys);
+ }
+ if (offsetDeduplication != null && offsetDeduplication) {
+ reader.withOffsetDeduplication(offsetDeduplication);
}
- reader = reader.withRedistribute();
}
return reader;
}
@@ -667,7 +681,8 @@ public class KafkaIOTest {
new ValueAsTimestampFn(),
true, /*redistribute*/
true, /*allowDuplicates*/
- 0)
+ 0, /*numKeys*/
+ null /*offsetDeduplication*/)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"group_id"))
@@ -693,7 +708,8 @@ public class KafkaIOTest {
new ValueAsTimestampFn(),
true, /*redistribute*/
false, /*allowDuplicates*/
- 0)
+ 0, /*numKeys*/
+ null /*offsetDeduplication*/)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"group_id"))
@@ -720,7 +736,8 @@ public class KafkaIOTest {
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
- 0)
+ 0, /*numKeys*/
+ null /*offsetDeduplication*/)
.withRedistributeNumKeys(100)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
@@ -2091,7 +2108,8 @@ public class KafkaIOTest {
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
- 0)
+ 0, /*numKeys*/
+ null /*offsetDeduplication*/)
.withStartReadTime(new Instant(startTime))
.withoutMetadata())
.apply(Values.create());
@@ -2100,6 +2118,20 @@ public class KafkaIOTest {
p.run();
}
+ @Test
+ public void testOffsetDeduplication() {
+ int numElements = 1000;
+
+ PCollection<Long> input =
+ p.apply(
+ mkKafkaReadTransformWithOffsetDedup(numElements, new
ValueAsTimestampFn())
+ .withoutMetadata())
+ .apply(Values.create());
+
+ addCountingAsserts(input, numElements, numElements, 0, numElements - 1);
+ p.run();
+ }
+
@Rule public ExpectedException noMessagesException =
ExpectedException.none();
@Test
@@ -2121,7 +2153,8 @@ public class KafkaIOTest {
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
- 0)
+ 0, /*numKeys*/
+ null /*offsetDeduplication*/)
.withStartReadTime(new Instant(startTime))
.withoutMetadata())
.apply(Values.create());
diff --git
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index 78337710bd2..298cd8e8338 100644
---
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -101,6 +101,7 @@ public class KafkaIOTranslation {
.addBooleanField("redistribute")
.addBooleanField("allows_duplicates")
.addNullableInt32Field("redistribute_num_keys")
+ .addNullableBooleanField("offset_deduplication")
.addNullableLogicalTypeField("watch_topic_partition_duration", new
NanosDuration())
.addByteArrayField("timestamp_policy_factory")
.addNullableMapField("offset_consumer_config", FieldType.STRING,
FieldType.BYTES)
@@ -221,6 +222,9 @@ public class KafkaIOTranslation {
fieldValues.put("redistribute", transform.isRedistributed());
fieldValues.put("redistribute_num_keys",
transform.getRedistributeNumKeys());
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
+ if (transform.getOffsetDeduplication() != null) {
+ fieldValues.put("offset_deduplication",
transform.getOffsetDeduplication());
+ }
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}
@@ -349,6 +353,12 @@ public class KafkaIOTranslation {
}
}
}
+ if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion,
"2.63.0") >= 0) {
+ @Nullable Boolean offsetDeduplication =
configRow.getValue("offset_deduplication");
+ if (offsetDeduplication != null) {
+ transform = transform.withOffsetDeduplication(offsetDeduplication);
+ }
+ }
Duration maxReadTime = configRow.getValue("max_read_time");
if (maxReadTime != null) {
transform =
diff --git
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index 095702a5c6f..140022de2ea 100644
---
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -65,6 +65,7 @@ public class KafkaIOTranslationTest {
READ_TRANSFORM_SCHEMA_MAPPING.put("getStartReadTime", "start_read_time");
READ_TRANSFORM_SCHEMA_MAPPING.put("getStopReadTime", "stop_read_time");
READ_TRANSFORM_SCHEMA_MAPPING.put("getRedistributeNumKeys",
"redistribute_num_keys");
+ READ_TRANSFORM_SCHEMA_MAPPING.put("getOffsetDeduplication",
"offset_deduplication");
READ_TRANSFORM_SCHEMA_MAPPING.put(
"isCommitOffsetsInFinalizeEnabled",
"is_commit_offset_finalize_enabled");
READ_TRANSFORM_SCHEMA_MAPPING.put("isDynamicRead", "is_dynamic_read");