This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cbccc9145d Kafka source offset-based deduplication. (#33596)
8cbccc9145d is described below

commit 8cbccc9145d2d07e4b1281618a488498f2827285
Author: Tom Stepp <[email protected]>
AuthorDate: Mon Feb 3 11:48:18 2025 -0800

    Kafka source offset-based deduplication. (#33596)
---
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java     | 21 ++++++++
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 53 +++++++++++++++----
 .../KafkaIOReadImplementationCompatibility.java    |  1 +
 .../org/apache/beam/sdk/io/kafka/KafkaIOUtils.java | 13 +++++
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java    | 33 +++++++++++-
 .../beam/sdk/io/kafka/KafkaUnboundedSource.java    | 23 ++++++--
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     |  1 -
 ...KafkaIOReadImplementationCompatibilityTest.java | 15 +++++-
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 61 +++++++++++++++++-----
 .../sdk/io/kafka/upgrade/KafkaIOTranslation.java   | 10 ++++
 .../io/kafka/upgrade/KafkaIOTranslationTest.java   |  1 +
 11 files changed, 200 insertions(+), 32 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 966363e41f6..4271d6f72a0 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
 import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
@@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark {
   @SuppressWarnings("initialization") // Avro will set the fields by breaking 
abstraction
   private KafkaCheckpointMark() {} // for Avro
 
+  private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
+
   public KafkaCheckpointMark(
       List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> 
reader) {
     this.partitions = partitions;
@@ -66,6 +70,23 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark {
     return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) 
+ '}';
   }
 
+  @Override
+  public byte[] getOffsetLimit() {
+    if (!reader.isPresent()) {
+      throw new RuntimeException(
+          "KafkaCheckpointMark reader is not present while calling 
getOffsetLimit().");
+    }
+    if (!reader.get().offsetBasedDeduplicationSupported()) {
+      throw new RuntimeException(
+          "Unexpected getOffsetLimit() called while KafkaUnboundedReader not 
configured for offset deduplication.");
+    }
+
+    // KafkaUnboundedSource.split() must produce a 1:1 partition to split 
ratio.
+    checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
+    PartitionMark partition = partitions.get(/* index= */ 0);
+    return 
KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
+  }
+
   /**
    * A tuple to hold topic, partition, and offset that comprise the checkpoint 
for a single
    * partition.
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cb7b3020c66..080cef5aaf0 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -717,6 +717,9 @@ public class KafkaIO {
     @Pure
     public abstract int getRedistributeNumKeys();
 
+    @Pure
+    public abstract @Nullable Boolean getOffsetDeduplication();
+
     @Pure
     public abstract @Nullable Duration getWatchTopicPartitionDuration();
 
@@ -782,6 +785,8 @@ public class KafkaIO {
 
       abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
 
+      abstract Builder<K, V> setOffsetDeduplication(Boolean 
offsetDeduplication);
+
       abstract Builder<K, V> setTimestampPolicyFactory(
           TimestampPolicyFactory<K, V> timestampPolicyFactory);
 
@@ -886,11 +891,16 @@ public class KafkaIO {
           if (config.allowDuplicates != null) {
             builder.setAllowDuplicates(config.allowDuplicates);
           }
-
+          if (config.redistribute
+              && (config.allowDuplicates == null || !config.allowDuplicates)
+              && config.offsetDeduplication != null) {
+            builder.setOffsetDeduplication(config.offsetDeduplication);
+          }
         } else {
           builder.setRedistributed(false);
           builder.setRedistributeNumKeys(0);
           builder.setAllowDuplicates(false);
+          builder.setOffsetDeduplication(false);
         }
       }
 
@@ -959,6 +969,7 @@ public class KafkaIO {
         private Integer redistributeNumKeys;
         private Boolean redistribute;
         private Boolean allowDuplicates;
+        private Boolean offsetDeduplication;
 
         public void setConsumerConfig(Map<String, String> consumerConfig) {
           this.consumerConfig = consumerConfig;
@@ -1015,6 +1026,10 @@ public class KafkaIO {
         public void setAllowDuplicates(Boolean allowDuplicates) {
           this.allowDuplicates = allowDuplicates;
         }
+
+        public void setOffsetDeduplication(Boolean offsetDeduplication) {
+          this.offsetDeduplication = offsetDeduplication;
+        }
       }
     }
 
@@ -1066,26 +1081,21 @@ public class KafkaIO {
      * Sets redistribute transform that hints to the runner to try to 
redistribute the work evenly.
      */
     public Read<K, V> withRedistribute() {
-      if (getRedistributeNumKeys() == 0 && isRedistributed()) {
-        LOG.warn("This will create a key per record, which is sub-optimal for 
most use cases.");
-      }
       return toBuilder().setRedistributed(true).build();
     }
 
     public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
-      if (!isAllowDuplicates()) {
-        LOG.warn("Setting this value without setting withRedistribute() will 
have no effect.");
-      }
       return toBuilder().setAllowDuplicates(allowDuplicates).build();
     }
 
     public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
-      checkState(
-          isRedistributed(),
-          "withRedistributeNumKeys is ignored if withRedistribute() is not 
enabled on the transform.");
       return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
     }
 
+    public Read<K, V> withOffsetDeduplication(Boolean offsetDeduplication) {
+      return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
+    }
+
     /**
      * Internally sets a {@link java.util.regex.Pattern} of topics to read 
from. All the partitions
      * from each of the matching topics are read.
@@ -1541,6 +1551,9 @@ public class KafkaIO {
               ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         }
       }
+
+      checkRedistributeConfiguration();
+
       warnAboutUnsafeConfigurations(input);
 
       // Infer key/value coders if not specified explicitly
@@ -1573,6 +1586,26 @@ public class KafkaIO {
       return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, 
valueCoder));
     }
 
+    private void checkRedistributeConfiguration() {
+      if (getRedistributeNumKeys() == 0 && isRedistributed()) {
+        LOG.warn(
+            "withRedistribute without withRedistributeNumKeys will create a 
key per record, which is sub-optimal for most use cases.");
+      }
+      if (isAllowDuplicates()) {
+        LOG.warn("Setting this value without setting withRedistribute() will 
have no effect.");
+      }
+      if (getRedistributeNumKeys() > 0) {
+        checkState(
+            isRedistributed(),
+            "withRedistributeNumKeys is ignored if withRedistribute() is not 
enabled on the transform.");
+      }
+      if (getOffsetDeduplication() != null && getOffsetDeduplication()) {
+        checkState(
+            isRedistributed() && !isAllowDuplicates(),
+            "withOffsetDeduplication should only be used with withRedistribute 
and withAllowDuplicates(false).");
+      }
+    }
+
     private void warnAboutUnsafeConfigurations(PBegin input) {
       Long checkpointingInterval =
           input
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
index 457e0003705..8e3c2b6ccc3 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
@@ -137,6 +137,7 @@ class KafkaIOReadImplementationCompatibility {
         return false;
       }
     },
+    OFFSET_DEDUPLICATION(LEGACY),
     ;
 
     private final @NonNull ImmutableSet<KafkaIOReadImplementation> 
supportedImplementations;
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
index 748418d1666..95f95000a58 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
@@ -19,11 +19,13 @@ package org.apache.beam.sdk.io.kafka;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -142,4 +144,15 @@ public final class KafkaIOUtils {
       return avg;
     }
   }
+
+  static final class OffsetBasedDeduplication {
+
+    static byte[] encodeOffset(long offset) {
+      return Longs.toByteArray(offset);
+    }
+
+    static byte[] getUniqueId(String topic, int partition, long offset) {
+      return (topic + "-" + partition + "-" + 
offset).getBytes(StandardCharsets.UTF_8);
+    }
+  }
 }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 02af0f7d3df..6d5b706a987 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -66,6 +66,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -299,6 +300,30 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     return curTimestamp;
   }
 
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    if (!offsetBasedDeduplicationSupported()) {
+      // Defer result to super if offset deduplication is not supported.
+      return super.getCurrentRecordId();
+    }
+    if (curRecord == null) {
+      throw new NoSuchElementException("KafkaUnboundedReader's curRecord is 
null.");
+    }
+    return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
+        curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
+  }
+
+  @Override
+  public byte[] getCurrentRecordOffset() throws NoSuchElementException {
+    if (!offsetBasedDeduplicationSupported()) {
+      throw new RuntimeException("UnboundedSource must enable offset-based 
deduplication.");
+    }
+    if (curRecord == null) {
+      throw new NoSuchElementException("KafkaUnboundedReader's curRecord is 
null.");
+    }
+    return 
KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
+  }
+
   @Override
   public long getSplitBacklogBytes() {
     long backlogBytes = 0;
@@ -313,6 +338,10 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     return backlogBytes;
   }
 
+  public boolean offsetBasedDeduplicationSupported() {
+    return source.offsetBasedDeduplicationSupported();
+  }
+
   
////////////////////////////////////////////////////////////////////////////////////////////////
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaUnboundedReader.class);
@@ -331,8 +360,8 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
   private final String name;
   private @Nullable Consumer<byte[], byte[]> consumer = null;
   private final List<PartitionState<K, V>> partitionStates;
-  private @Nullable KafkaRecord<K, V> curRecord = null;
-  private @Nullable Instant curTimestamp = null;
+  private @MonotonicNonNull KafkaRecord<K, V> curRecord = null;
+  private @MonotonicNonNull Instant curTimestamp = null;
   private Iterator<PartitionState<K, V>> curBatch = 
Collections.emptyIterator();
 
   private @Nullable Deserializer<K> keyDeserializerInstance = null;
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 9685d859b0a..fa01af02a35 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -113,10 +113,20 @@ class KafkaUnboundedSource<K, V> extends 
UnboundedSource<KafkaRecord<K, V>, Kafk
         partitions.size() > 0,
         "Could not find any partitions. Please check Kafka configuration and 
topic names");
 
-    int numSplits = Math.min(desiredNumSplits, partitions.size());
-    // XXX make all splits have the same # of partitions
-    while (partitions.size() % numSplits > 0) {
-      ++numSplits;
+    int numSplits;
+    if (offsetBasedDeduplicationSupported()) {
+      // Enforce 1:1 split to partition ratio for offset deduplication.
+      numSplits = partitions.size();
+      LOG.info(
+          "Offset-based deduplication is enabled for KafkaUnboundedSource. "
+              + "Forcing the number of splits to equal the number of total 
partitions: {}.",
+          numSplits);
+    } else {
+      numSplits = Math.min(desiredNumSplits, partitions.size());
+      // Make all splits have the same # of partitions.
+      while (partitions.size() % numSplits > 0) {
+        ++numSplits;
+      }
     }
     List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
 
@@ -177,6 +187,11 @@ class KafkaUnboundedSource<K, V> extends 
UnboundedSource<KafkaRecord<K, V>, Kafk
     return false;
   }
 
+  @Override
+  public boolean offsetBasedDeduplicationSupported() {
+    return spec.getOffsetDeduplication() != null && 
spec.getOffsetDeduplication();
+  }
+
   @Override
   public Coder<KafkaRecord<K, V>> getOutputCoder() {
     Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index ae1078e25cb..808058a4482 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -145,7 +145,6 @@ public class KafkaIOExternalTest {
     expansionService.expand(request, observer);
     ExpansionApi.ExpansionResponse result = observer.result;
     RunnerApi.PTransform transform = result.getTransform();
-    System.out.println("xxx : " + result.toString());
     assertThat(
         transform.getSubtransformsList(),
         Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
index 29c920bf9a6..8eda52bcec9 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.kafka;
 
 import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransform;
+import static 
org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransformWithOffsetDedup;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
@@ -114,7 +115,8 @@ public class KafkaIOReadImplementationCompatibilityTest {
                 new ValueAsTimestampFn(),
                 false, /*redistribute*/
                 false, /*allowDuplicates*/
-                0)));
+                0, /*numKeys*/
+                null /*offsetDeduplication*/)));
     return p.run();
   }
 
@@ -139,6 +141,17 @@ public class KafkaIOReadImplementationCompatibilityTest {
     assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), 
containsInAnyOrder(expect));
   }
 
+  @Test
+  public void testReadTransformCreationWithOffsetDeduplication() {
+    p.apply(mkKafkaReadTransformWithOffsetDedup(1000, new 
ValueAsTimestampFn()));
+    PipelineResult r = p.run();
+    String[] expect =
+        KafkaIOTest.mkKafkaTopics.stream()
+            .map(topic -> String.format("kafka:`%s`.%s", 
KafkaIOTest.mkKafkaServers, topic))
+            .toArray(String[]::new);
+    assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), 
containsInAnyOrder(expect));
+  }
+
   @Test
   public void testReadTransformCreationWithSdfImplementationBoundProperty() {
     PipelineResult r =
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index e614320db15..6caa1868c99 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -391,7 +391,20 @@ public class KafkaIOTest {
         timestampFn,
         false, /*redistribute*/
         false, /*allowDuplicates*/
-        0);
+        0, /*numKeys*/
+        null /*offsetDeduplication*/);
+  }
+
+  static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithOffsetDedup(
+      int numElements, @Nullable SerializableFunction<KV<Integer, Long>, 
Instant> timestampFn) {
+    return mkKafkaReadTransform(
+        numElements,
+        numElements,
+        timestampFn,
+        true, /*redistribute*/
+        false, /*allowDuplicates*/
+        100, /*numKeys*/
+        true /*offsetDeduplication*/);
   }
 
   /**
@@ -404,7 +417,8 @@ public class KafkaIOTest {
       @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn,
       @Nullable Boolean redistribute,
       @Nullable Boolean withAllowDuplicates,
-      @Nullable Integer numKeys) {
+      @Nullable Integer numKeys,
+      @Nullable Boolean offsetDeduplication) {
 
     KafkaIO.Read<Integer, Long> reader =
         KafkaIO.<Integer, Long>read()
@@ -427,15 +441,15 @@ public class KafkaIOTest {
       reader = reader.withTimestampFn(timestampFn);
     }
 
-    if (redistribute) {
+    if (redistribute != null && redistribute) {
+      reader = reader.withRedistribute();
+      reader = reader.withAllowDuplicates(withAllowDuplicates);
       if (numKeys != null) {
-        reader =
-            reader
-                .withRedistribute()
-                .withAllowDuplicates(withAllowDuplicates)
-                .withRedistributeNumKeys(numKeys);
+        reader = reader.withRedistributeNumKeys(numKeys);
+      }
+      if (offsetDeduplication != null && offsetDeduplication) {
+        reader.withOffsetDeduplication(offsetDeduplication);
       }
-      reader = reader.withRedistribute();
     }
     return reader;
   }
@@ -667,7 +681,8 @@ public class KafkaIOTest {
                         new ValueAsTimestampFn(),
                         true, /*redistribute*/
                         true, /*allowDuplicates*/
-                        0)
+                        0, /*numKeys*/
+                        null /*offsetDeduplication*/)
                     .commitOffsetsInFinalize()
                     .withConsumerConfigUpdates(
                         ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"group_id"))
@@ -693,7 +708,8 @@ public class KafkaIOTest {
                         new ValueAsTimestampFn(),
                         true, /*redistribute*/
                         false, /*allowDuplicates*/
-                        0)
+                        0, /*numKeys*/
+                        null /*offsetDeduplication*/)
                     .commitOffsetsInFinalize()
                     .withConsumerConfigUpdates(
                         ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"group_id"))
@@ -720,7 +736,8 @@ public class KafkaIOTest {
                         new ValueAsTimestampFn(),
                         false, /*redistribute*/
                         false, /*allowDuplicates*/
-                        0)
+                        0, /*numKeys*/
+                        null /*offsetDeduplication*/)
                     .withRedistributeNumKeys(100)
                     .commitOffsetsInFinalize()
                     .withConsumerConfigUpdates(
@@ -2091,7 +2108,8 @@ public class KafkaIOTest {
                         new ValueAsTimestampFn(),
                         false, /*redistribute*/
                         false, /*allowDuplicates*/
-                        0)
+                        0, /*numKeys*/
+                        null /*offsetDeduplication*/)
                     .withStartReadTime(new Instant(startTime))
                     .withoutMetadata())
             .apply(Values.create());
@@ -2100,6 +2118,20 @@ public class KafkaIOTest {
     p.run();
   }
 
+  @Test
+  public void testOffsetDeduplication() {
+    int numElements = 1000;
+
+    PCollection<Long> input =
+        p.apply(
+                mkKafkaReadTransformWithOffsetDedup(numElements, new 
ValueAsTimestampFn())
+                    .withoutMetadata())
+            .apply(Values.create());
+
+    addCountingAsserts(input, numElements, numElements, 0, numElements - 1);
+    p.run();
+  }
+
   @Rule public ExpectedException noMessagesException = 
ExpectedException.none();
 
   @Test
@@ -2121,7 +2153,8 @@ public class KafkaIOTest {
                     new ValueAsTimestampFn(),
                     false, /*redistribute*/
                     false, /*allowDuplicates*/
-                    0)
+                    0, /*numKeys*/
+                    null /*offsetDeduplication*/)
                 .withStartReadTime(new Instant(startTime))
                 .withoutMetadata())
         .apply(Values.create());
diff --git 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index 78337710bd2..298cd8e8338 100644
--- 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++ 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -101,6 +101,7 @@ public class KafkaIOTranslation {
             .addBooleanField("redistribute")
             .addBooleanField("allows_duplicates")
             .addNullableInt32Field("redistribute_num_keys")
+            .addNullableBooleanField("offset_deduplication")
             .addNullableLogicalTypeField("watch_topic_partition_duration", new 
NanosDuration())
             .addByteArrayField("timestamp_policy_factory")
             .addNullableMapField("offset_consumer_config", FieldType.STRING, 
FieldType.BYTES)
@@ -221,6 +222,9 @@ public class KafkaIOTranslation {
       fieldValues.put("redistribute", transform.isRedistributed());
       fieldValues.put("redistribute_num_keys", 
transform.getRedistributeNumKeys());
       fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
+      if (transform.getOffsetDeduplication() != null) {
+        fieldValues.put("offset_deduplication", 
transform.getOffsetDeduplication());
+      }
       return Row.withSchema(schema).withFieldValues(fieldValues).build();
     }
 
@@ -349,6 +353,12 @@ public class KafkaIOTranslation {
             }
           }
         }
+        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, 
"2.63.0") >= 0) {
+          @Nullable Boolean offsetDeduplication = 
configRow.getValue("offset_deduplication");
+          if (offsetDeduplication != null) {
+            transform = transform.withOffsetDeduplication(offsetDeduplication);
+          }
+        }
         Duration maxReadTime = configRow.getValue("max_read_time");
         if (maxReadTime != null) {
           transform =
diff --git 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index 095702a5c6f..140022de2ea 100644
--- 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++ 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -65,6 +65,7 @@ public class KafkaIOTranslationTest {
     READ_TRANSFORM_SCHEMA_MAPPING.put("getStartReadTime", "start_read_time");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getStopReadTime", "stop_read_time");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getRedistributeNumKeys", 
"redistribute_num_keys");
+    READ_TRANSFORM_SCHEMA_MAPPING.put("getOffsetDeduplication", 
"offset_deduplication");
     READ_TRANSFORM_SCHEMA_MAPPING.put(
         "isCommitOffsetsInFinalizeEnabled", 
"is_commit_offset_finalize_enabled");
     READ_TRANSFORM_SCHEMA_MAPPING.put("isDynamicRead", "is_dynamic_read");

Reply via email to