This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 83411302794 add test to verify kafka sdf commit expansion (#32438)
83411302794 is described below
commit 834113027941b2203bea149cf60ed0c47a35f7a1
Author: Naireen Hussain <[email protected]>
AuthorDate: Thu Sep 19 04:12:57 2024 -0700
add test to verify kafka sdf commit expansion (#32438)
Co-authored-by: Naireen <[email protected]>
---
.../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 60 ++++++++++++++++++++++
1 file changed, 60 insertions(+)
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 6ee3d9d96ef..75b9cfc9a74 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import org.apache.beam.runners.core.metrics.DistributionCell;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -40,6 +41,9 @@ import
org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -62,6 +66,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
@@ -526,6 +531,18 @@ public class ReadFromKafkaDoFnTest {
assertEquals(ProcessContinuation.stop(), result);
}
+ @Test
+ public void testSDFCommitOffsetEnabled() {
+ OffSetsVisitor visitor = testCommittingOffsets(true);
+ Assert.assertEquals(true, visitor.foundOffsetTransform);
+ }
+
+ @Test
+ public void testSDFCommitOffsetNotEnabled() {
+ OffSetsVisitor visitor = testCommittingOffsets(false);
+ Assert.assertNotEquals(true, visitor.foundOffsetTransform);
+ }
+
@Test
public void testProcessElementWhenTopicPartitionIsStopped() throws Exception
{
MockMultiOutputReceiver receiver = new MockMultiOutputReceiver();
@@ -688,4 +705,47 @@ public class ReadFromKafkaDoFnTest {
}
}
}
+
+ private OffSetsVisitor testCommittingOffsets(boolean enableOffsets) {
+
+ // Force Kafka read to use SDF implementation
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ ExperimentalOptions.addExperiment(
+ pipelineOptions.as(ExperimentalOptions.class), "use_sdf_read");
+
+ Pipeline p = Pipeline.create(pipelineOptions);
+ KafkaIO.Read<String, String> read =
+ KafkaIO.<String, String>read()
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ .withConsumerConfigUpdates(
+ new ImmutableMap.Builder<String, Object>()
+ .put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_1")
+ .build())
+ .withBootstrapServers("bootstrap_server")
+ .withTopic("test-topic");
+
+ if (enableOffsets) {
+ read = read.commitOffsetsInFinalize();
+ }
+
+ p.apply(read.withoutMetadata());
+ OffSetsVisitor visitor = new OffSetsVisitor();
+ p.traverseTopologically(visitor);
+ return visitor;
+ }
+
+ static class OffSetsVisitor extends PipelineVisitor.Defaults {
+ boolean foundOffsetTransform = false;
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (value instanceof PCollection) {
+ PCollection<?> pc = (PCollection<?>) value;
+ if (pc.getName().contains("KafkaCommitOffset")) {
+ foundOffsetTransform = true;
+ }
+ }
+ }
+ }
}