This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b2d21acf8b Fix Kafka with Redistribute and commits enabled (#32344)
1b2d21acf8b is described below

commit 1b2d21acf8b88c14cd9395f6e69c5802e65786f0
Author: Naireen Hussain <[email protected]>
AuthorDate: Tue Sep 17 01:50:22 2024 -0700

    Fix Kafka with Redistribute and commits enabled (#32344)
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  30 ++++--
 ...KafkaIOReadImplementationCompatibilityTest.java |   8 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 118 +++++++++++++++++++--
 3 files changed, 136 insertions(+), 20 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 1fd3e3e044e..0f28edf19dd 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -890,7 +890,6 @@ public class KafkaIO {
           builder.setRedistributeNumKeys(0);
           builder.setAllowDuplicates(false);
         }
-        System.out.println("xxx builder service" + builder.toString());
       }
 
       private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> 
deserializer) {
@@ -1697,11 +1696,12 @@ public class KafkaIO {
         }
 
         if (kafkaRead.isRedistributed()) {
-          // fail here instead.
-          checkArgument(
-              kafkaRead.isCommitOffsetsInFinalizeEnabled(),
-              "commitOffsetsInFinalize() can't be enabled with 
isRedistributed");
+          if (kafkaRead.isCommitOffsetsInFinalizeEnabled() && 
kafkaRead.isAllowDuplicates()) {
+            LOG.warn(
+                "Offsets committed due to usage of commitOffsetsInFinalize() 
and may not capture all work processed due to use of withRedistribute() with 
duplicates enabled");
+          }
           PCollection<KafkaRecord<K, V>> output = 
input.getPipeline().apply(transform);
+
           if (kafkaRead.getRedistributeNumKeys() == 0) {
             return output.apply(
                 "Insert Redistribute",
@@ -1797,7 +1797,7 @@ public class KafkaIO {
             return pcol.apply(
                 "Insert Redistribute with Shards",
                 Redistribute.<KafkaRecord<K, V>>arbitrarily()
-                    .withAllowDuplicates(true)
+                    .withAllowDuplicates(kafkaRead.isAllowDuplicates())
                     .withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
           }
         }
@@ -2654,6 +2654,12 @@ public class KafkaIO {
         if (getRedistributeNumKeys() == 0) {
           LOG.warn("This will create a key per record, which is sub-optimal 
for most use cases.");
         }
+        if ((isCommitOffsetEnabled() || configuredKafkaCommit()) && 
isAllowDuplicates()) {
+          LOG.warn(
+              "Either auto_commit is set, or commitOffsetEnabled is enabled 
(or both), but since "
+                  + "withRestribute() is enabled with allow duplicates, the 
runner may have additional work processed that "
+                  + "is ahead of the current checkpoint");
+        }
       }
 
       if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == 
null) {
@@ -2687,8 +2693,7 @@ public class KafkaIO {
                             .getSchemaCoder(KafkaSourceDescriptor.class),
                         recordCoder));
 
-        boolean applyCommitOffsets =
-            isCommitOffsetEnabled() && !configuredKafkaCommit() && 
!isRedistribute();
+        boolean applyCommitOffsets = isCommitOffsetEnabled() && 
!configuredKafkaCommit();
         if (!applyCommitOffsets) {
           return outputWithDescriptor
               .apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() 
{}).via(KV::getValue))
@@ -2710,6 +2715,15 @@ public class KafkaIO {
           if (Comparators.lexicographical(Comparator.<String>naturalOrder())
                   .compare(requestedVersion, targetVersion)
               < 0) {
+            // Redistribute is not allowed with commits prior to 2.59.0, since 
there is a Reshuffle
+            // prior to the redistribute. The reshuffle will occur before 
commits are offsetted and
+            // before outputting KafkaRecords. Adding a redistribute then 
afterwards doesn't provide
+            // additional performance benefit.
+            checkArgument(
+                !isRedistribute(),
+                "Can not enable isRedistribute() while committing offsets 
prior to "
+                    + String.join(".", targetVersion));
+
             return expand259Commits(
                 outputWithDescriptor, recordCoder, 
input.getPipeline().getSchemaRegistry());
           }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
index 74f1e83fd86..29c920bf9a6 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java
@@ -108,7 +108,13 @@ public class KafkaIOReadImplementationCompatibilityTest {
       Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>> 
kafkaReadDecorator) {
     p.apply(
         kafkaReadDecorator.apply(
-            mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false, 
0)));
+            mkKafkaReadTransform(
+                1000,
+                null,
+                new ValueAsTimestampFn(),
+                false, /*redistribute*/
+                false, /*allowDuplicates*/
+                0)));
     return p.run();
   }
 
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 1fe1147a739..25ff6dad124 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -88,6 +88,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -381,7 +382,13 @@ public class KafkaIOTest {
 
   static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
       int numElements, @Nullable SerializableFunction<KV<Integer, Long>, 
Instant> timestampFn) {
-    return mkKafkaReadTransform(numElements, numElements, timestampFn, false, 
0);
+    return mkKafkaReadTransform(
+        numElements,
+        numElements,
+        timestampFn,
+        false, /*redistribute*/
+        false, /*allowDuplicates*/
+        0);
   }
 
   /**
@@ -393,6 +400,7 @@ public class KafkaIOTest {
       @Nullable Integer maxNumRecords,
       @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn,
       @Nullable Boolean redistribute,
+      @Nullable Boolean withAllowDuplicates,
       @Nullable Integer numKeys) {
 
     KafkaIO.Read<Integer, Long> reader =
@@ -408,13 +416,21 @@ public class KafkaIOTest {
       reader = reader.withMaxNumRecords(maxNumRecords);
     }
 
+    if (withAllowDuplicates == null) {
+      withAllowDuplicates = false;
+    }
+
     if (timestampFn != null) {
       reader = reader.withTimestampFn(timestampFn);
     }
 
     if (redistribute) {
       if (numKeys != null) {
-        reader = reader.withRedistribute().withRedistributeNumKeys(numKeys);
+        reader =
+            reader
+                .withRedistribute()
+                .withAllowDuplicates(withAllowDuplicates)
+                .withRedistributeNumKeys(numKeys);
       }
       reader = reader.withRedistribute();
     }
@@ -628,17 +644,47 @@ public class KafkaIOTest {
   }
 
   @Test
-  public void testCommitOffsetsInFinalizeAndRedistributeErrors() {
-    thrown.expect(Exception.class);
-    thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with 
isRedistributed");
+  public void warningsWithAllowDuplicatesEnabledAndCommitOffsets() {
+    int numElements = 1000;
 
+    PCollection<Long> input =
+        p.apply(
+                mkKafkaReadTransform(
+                        numElements,
+                        numElements,
+                        new ValueAsTimestampFn(),
+                        true, /*redistribute*/
+                        true, /*allowDuplicates*/
+                        0)
+                    .commitOffsetsInFinalize()
+                    .withConsumerConfigUpdates(
+                        ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"group_id"))
+                    .withoutMetadata())
+            .apply(Values.create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+
+    kafkaIOExpectedLogs.verifyWarn(
+        "Offsets committed due to usage of commitOffsetsInFinalize() and may 
not capture all work processed due to use of withRedistribute() with duplicates 
enabled");
+  }
+
+  @Test
+  public void noWarningsWithNoAllowDuplicatesAndCommitOffsets() {
     int numElements = 1000;
 
     PCollection<Long> input =
         p.apply(
-                mkKafkaReadTransform(numElements, numElements, new 
ValueAsTimestampFn(), true, 0)
+                mkKafkaReadTransform(
+                        numElements,
+                        numElements,
+                        new ValueAsTimestampFn(),
+                        true, /*redistribute*/
+                        false, /*allowDuplicates*/
+                        0)
+                    .commitOffsetsInFinalize()
                     .withConsumerConfigUpdates(
-                        
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
+                        ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"group_id"))
                     .withoutMetadata())
             .apply(Values.create());
 
@@ -648,13 +694,25 @@ public class KafkaIOTest {
 
   @Test
   public void testNumKeysIgnoredWithRedistributeNotEnabled() {
+    thrown.expect(Exception.class);
+    thrown.expectMessage(
+        "withRedistributeNumKeys is ignored if withRedistribute() is not 
enabled on the transform");
+
     int numElements = 1000;
 
     PCollection<Long> input =
         p.apply(
-                mkKafkaReadTransform(numElements, numElements, new 
ValueAsTimestampFn(), false, 0)
+                mkKafkaReadTransform(
+                        numElements,
+                        numElements,
+                        new ValueAsTimestampFn(),
+                        false, /*redistribute*/
+                        false, /*allowDuplicates*/
+                        0)
+                    .withRedistributeNumKeys(100)
+                    .commitOffsetsInFinalize()
                     .withConsumerConfigUpdates(
-                        
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
+                        ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"group_id"))
                     .withoutMetadata())
             .apply(Values.create());
 
@@ -663,6 +721,32 @@ public class KafkaIOTest {
     p.run();
   }
 
+  @Test
+  public void testDisableRedistributeKafkaOffsetLegacy() {
+    thrown.expect(Exception.class);
+    thrown.expectMessage(
+        "Can not enable isRedistribute() while committing offsets prior to 
2.60.0");
+    
p.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.59.0");
+
+    p.apply(
+            Create.of(
+                KafkaSourceDescriptor.of(
+                    new TopicPartition("topic", 1),
+                    null,
+                    null,
+                    null,
+                    null,
+                    ImmutableList.of("8.8.8.8:9092"))))
+        .apply(
+            KafkaIO.<Long, Long>readSourceDescriptors()
+                .withKeyDeserializer(LongDeserializer.class)
+                .withValueDeserializer(LongDeserializer.class)
+                .withRedistribute()
+                .withProcessingTime()
+                .commitOffsets());
+    p.run();
+  }
+
   @Test
   public void testUnreachableKafkaBrokers() {
     // Expect an exception when the Kafka brokers are not reachable on the 
workers.
@@ -1982,7 +2066,13 @@ public class KafkaIOTest {
 
     PCollection<Long> input =
         p.apply(
-                mkKafkaReadTransform(numElements, maxNumRecords, new 
ValueAsTimestampFn(), false, 0)
+                mkKafkaReadTransform(
+                        numElements,
+                        maxNumRecords,
+                        new ValueAsTimestampFn(),
+                        false, /*redistribute*/
+                        false, /*allowDuplicates*/
+                        0)
                     .withStartReadTime(new Instant(startTime))
                     .withoutMetadata())
             .apply(Values.create());
@@ -2006,7 +2096,13 @@ public class KafkaIOTest {
     int startTime = numElements / 20;
 
     p.apply(
-            mkKafkaReadTransform(numElements, numElements, new 
ValueAsTimestampFn(), false, 0)
+            mkKafkaReadTransform(
+                    numElements,
+                    numElements,
+                    new ValueAsTimestampFn(),
+                    false, /*redistribute*/
+                    false, /*allowDuplicates*/
+                    0)
                 .withStartReadTime(new Instant(startTime))
                 .withoutMetadata())
         .apply(Values.create());

Reply via email to