This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 83411302794 add test to verify kafka sdf commit expansion (#32438)
83411302794 is described below

commit 834113027941b2203bea149cf60ed0c47a35f7a1
Author: Naireen Hussain <[email protected]>
AuthorDate: Thu Sep 19 04:12:57 2024 -0700

    add test to verify kafka sdf commit expansion (#32438)
    
    Co-authored-by: Naireen <[email protected]>
---
 .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java   | 60 ++++++++++++++++++++++
 1 file changed, 60 insertions(+)

diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 6ee3d9d96ef..75b9cfc9a74 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import org.apache.beam.runners.core.metrics.DistributionCell;
 import org.apache.beam.runners.core.metrics.DistributionData;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -40,6 +41,9 @@ import 
org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -62,6 +66,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -526,6 +531,18 @@ public class ReadFromKafkaDoFnTest {
     assertEquals(ProcessContinuation.stop(), result);
   }
 
+  @Test
+  public void testSDFCommitOffsetEnabled() {
+    OffSetsVisitor visitor = testCommittingOffsets(true);
+    Assert.assertEquals(true, visitor.foundOffsetTransform);
+  }
+
+  @Test
+  public void testSDFCommitOffsetNotEnabled() {
+    OffSetsVisitor visitor = testCommittingOffsets(false);
+    Assert.assertNotEquals(true, visitor.foundOffsetTransform);
+  }
+
   @Test
   public void testProcessElementWhenTopicPartitionIsStopped() throws Exception 
{
     MockMultiOutputReceiver receiver = new MockMultiOutputReceiver();
@@ -688,4 +705,47 @@ public class ReadFromKafkaDoFnTest {
       }
     }
   }
+
+  private OffSetsVisitor testCommittingOffsets(boolean enableOffsets) {
+
+    // Force Kafka read to use SDF implementation
+    PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+    ExperimentalOptions.addExperiment(
+        pipelineOptions.as(ExperimentalOptions.class), "use_sdf_read");
+
+    Pipeline p = Pipeline.create(pipelineOptions);
+    KafkaIO.Read<String, String> read =
+        KafkaIO.<String, String>read()
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withConsumerConfigUpdates(
+                new ImmutableMap.Builder<String, Object>()
+                    .put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_1")
+                    .build())
+            .withBootstrapServers("bootstrap_server")
+            .withTopic("test-topic");
+
+    if (enableOffsets) {
+      read = read.commitOffsetsInFinalize();
+    }
+
+    p.apply(read.withoutMetadata());
+    OffSetsVisitor visitor = new OffSetsVisitor();
+    p.traverseTopologically(visitor);
+    return visitor;
+  }
+
+  static class OffSetsVisitor extends PipelineVisitor.Defaults {
+    boolean foundOffsetTransform = false;
+
+    @Override
+    public void visitValue(PValue value, Node producer) {
+      if (value instanceof PCollection) {
+        PCollection<?> pc = (PCollection<?>) value;
+        if (pc.getName().contains("KafkaCommitOffset")) {
+          foundOffsetTransform = true;
+        }
+      }
+    }
+  }
 }

Reply via email to