This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f2cc926 Eliminate beam_fn_api from KafkaIO expansion
new 3b85447 Merge pull request #14419 from [BEAM-12114] Eliminate
beam_fn_api from KafkaIO expansion
f2cc926 is described below
commit f2cc92663ad8ae685183e076cdb652d8fc3ba4e0
Author: Boyuan Zhang <[email protected]>
AuthorDate: Fri Apr 2 15:20:53 2021 -0700
Eliminate beam_fn_api from KafkaIO expansion
---
runners/google-cloud-dataflow-java/build.gradle | 1 +
.../beam/runners/dataflow/DataflowRunner.java | 4 +
.../SparkStructuredStreamingRunner.java | 3 +
runners/spark/spark_runner.gradle | 1 +
.../org/apache/beam/runners/spark/SparkRunner.java | 3 +
.../beam/runners/spark/SparkRunnerDebugger.java | 3 +
sdks/java/io/kafka/build.gradle | 1 +
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 176 +++++++++++++++------
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 4 +-
9 files changed, 147 insertions(+), 49 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index 290ea94..476e8c5 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -72,6 +72,7 @@ dependencies {
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:extensions:google-cloud-platform-core")
+ compile project(":sdks:java:io:kafka")
compile project(":sdks:java:io:google-cloud-platform")
compile project(":runners:core-construction-java")
compile library.java.avro
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c81631e..0a79cd9 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -115,6 +115,7 @@ import
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageId
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -491,6 +492,9 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
new StreamingPubsubIOWriteOverrideFactory(this)));
}
}
+ if (useUnifiedWorker(options)) {
+ overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
+ }
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.writeWithRunnerDeterminedSharding(),
diff --git
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index f08c36b..5d8230e 100644
---
a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++
b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -37,12 +37,14 @@ import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.Pipel
import
org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
@@ -193,6 +195,7 @@ public final class SparkStructuredStreamingRunner
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_deprecated_read")) {
+ pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
diff --git a/runners/spark/spark_runner.gradle
b/runners/spark/spark_runner.gradle
index fe2393e..38519d0 100644
--- a/runners/spark/spark_runner.gradle
+++ b/runners/spark/spark_runner.gradle
@@ -152,6 +152,7 @@ dependencies {
compile project(":runners:core-java")
compile project(":runners:java-fn-execution")
compile project(":runners:java-job-service")
+ compile project(":sdks:java:io:kafka")
compile project(":sdks:java:extensions:google-cloud-platform-core")
compile library.java.jackson_annotations
compile library.java.slf4j_api
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 364e549..60a113e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -48,6 +48,7 @@ import
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvanci
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
@@ -66,6 +67,7 @@ import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
@@ -181,6 +183,7 @@ public final class SparkRunner extends
PipelineRunner<SparkPipelineResult> {
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_deprecated_read")) {
+ pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 33b8408..37d9d54 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -26,9 +26,11 @@ import
org.apache.beam.runners.spark.translation.TransformTranslator;
import
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;
@@ -85,6 +87,7 @@ public final class SparkRunnerDebugger extends
PipelineRunner<SparkPipelineResul
|| ExperimentalOptions.hasExperiment(
pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(pipeline.getOptions(),
"use_deprecated_read")) {
+ pipeline.replaceAll(ImmutableList.of(KafkaIO.Read.KAFKA_READ_OVERRIDE));
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline);
}
JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index df76a82..7a4ca21 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -48,6 +48,7 @@ kafkaVersions.each{k,v ->
configurations.create("kafkaVersion$k")}
dependencies {
compile library.java.vendored_guava_26_0_jre
compile project(path: ":sdks:java:core", configuration: "shadow")
+ compile project(":runners:core-construction-java")
compile project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
compile library.java.avro
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 06d4a5d..8b6058c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -33,8 +33,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -51,6 +54,9 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.DoFn;
@@ -72,6 +78,7 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -1209,67 +1216,144 @@ public class KafkaIO {
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);
- // The Read will be expanded into SDF transform when "beam_fn_api" is
enabled.
- if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(),
"beam_fn_api")
- || ExperimentalOptions.hasExperiment(
+ // For read from unbounded in a bounded manner, we actually are not
going through Read or SDF.
+ if (ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(),
"beam_fn_api_use_deprecated_read")
+ || ExperimentalOptions.hasExperiment(
+ input.getPipeline().getOptions(), "use_deprecated_read")
|| getMaxNumRecords() < Long.MAX_VALUE
|| getMaxReadTime() != null) {
+ return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder,
valueCoder));
+ }
+ return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder,
valueCoder));
+ }
+
+ /**
+ * A {@link PTransformOverride} for runners to swap {@link
ReadFromKafkaViaSDF} to legacy Kafka
+ * read if runners doesn't have a good support on executing unbounded
Splittable DoFn.
+ */
+ @Internal
+ public static final PTransformOverride KAFKA_READ_OVERRIDE =
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(ReadFromKafkaViaSDF.class),
+ new KafkaReadOverrideFactory<>());
+
+ private static class KafkaReadOverrideFactory<K, V>
+ implements PTransformOverrideFactory<
+ PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>>
{
+
+ @Override
+ public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>>
getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>,
ReadFromKafkaViaSDF<K, V>>
+ transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(),
+ new ReadFromKafkaViaUnbounded<>(
+ transform.getTransform().kafkaRead,
+ transform.getTransform().keyCoder,
+ transform.getTransform().valueCoder));
+ }
+
+ @Override
+ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K,
V>> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ private static class ReadFromKafkaViaUnbounded<K, V>
+ extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+ Read<K, V> kafkaRead;
+ Coder<K> keyCoder;
+ Coder<V> valueCoder;
+
+ ReadFromKafkaViaUnbounded(Read<K, V> kafkaRead, Coder<K> keyCoder,
Coder<V> valueCoder) {
+ this.kafkaRead = kafkaRead;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ }
+
+ @Override
+ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
// Handles unbounded source to bounded conversion if maxNumRecords or
maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
org.apache.beam.sdk.io.Read.from(
-
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+ kafkaRead
+ .toBuilder()
+ .setKeyCoder(keyCoder)
+ .setValueCoder(valueCoder)
+ .build()
+ .makeSource());
PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform =
unbounded;
- if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
+ if (kafkaRead.getMaxNumRecords() < Long.MAX_VALUE ||
kafkaRead.getMaxReadTime() != null) {
transform =
-
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
+ unbounded
+ .withMaxReadTime(kafkaRead.getMaxReadTime())
+ .withMaxNumRecords(kafkaRead.getMaxNumRecords());
}
return input.getPipeline().apply(transform);
}
- ReadSourceDescriptors<K, V> readTransform =
- ReadSourceDescriptors.<K, V>read()
- .withConsumerConfigOverrides(getConsumerConfig())
- .withOffsetConsumerConfigOverrides(getOffsetConsumerConfig())
- .withConsumerFactoryFn(getConsumerFactoryFn())
- .withKeyDeserializerProvider(getKeyDeserializerProvider())
- .withValueDeserializerProvider(getValueDeserializerProvider())
- .withManualWatermarkEstimator()
- .withTimestampPolicyFactory(getTimestampPolicyFactory())
- .withCheckStopReadingFn(getCheckStopReadingFn());
- if (isCommitOffsetsInFinalizeEnabled()) {
- readTransform = readTransform.commitOffsets();
+ }
+
+ static class ReadFromKafkaViaSDF<K, V>
+ extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+ Read<K, V> kafkaRead;
+ Coder<K> keyCoder;
+ Coder<V> valueCoder;
+
+ ReadFromKafkaViaSDF(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V>
valueCoder) {
+ this.kafkaRead = kafkaRead;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
}
- PCollection<KafkaSourceDescriptor> output;
- if (isDynamicRead()) {
- output =
- input
- .getPipeline()
- .apply(Impulse.create())
- .apply(
- MapElements.into(
- TypeDescriptors.kvs(
- new TypeDescriptor<byte[]>() {}, new
TypeDescriptor<byte[]>() {}))
- .via(element -> KV.of(element, element)))
- .apply(
- ParDo.of(
- new WatchKafkaTopicPartitionDoFn(
- getWatchTopicPartitionDuration(),
- getConsumerFactoryFn(),
- getCheckStopReadingFn(),
- getConsumerConfig(),
- getStartReadTime())));
- } else {
- output =
- input
- .getPipeline()
- .apply(Impulse.create())
- .apply(ParDo.of(new GenerateKafkaSourceDescriptor(this)));
+ @Override
+ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
+ ReadSourceDescriptors<K, V> readTransform =
+ ReadSourceDescriptors.<K, V>read()
+ .withConsumerConfigOverrides(kafkaRead.getConsumerConfig())
+
.withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig())
+ .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn())
+
.withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider())
+
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
+ .withManualWatermarkEstimator()
+
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
+ .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
+ if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
+ readTransform = readTransform.commitOffsets();
+ }
+ PCollection<KafkaSourceDescriptor> output;
+ if (kafkaRead.isDynamicRead()) {
+ output =
+ input
+ .getPipeline()
+ .apply(Impulse.create())
+ .apply(
+ MapElements.into(
+ TypeDescriptors.kvs(
+ new TypeDescriptor<byte[]>() {}, new
TypeDescriptor<byte[]>() {}))
+ .via(element -> KV.of(element, element)))
+ .apply(
+ ParDo.of(
+ new WatchKafkaTopicPartitionDoFn(
+ kafkaRead.getWatchTopicPartitionDuration(),
+ kafkaRead.getConsumerFactoryFn(),
+ kafkaRead.getCheckStopReadingFn(),
+ kafkaRead.getConsumerConfig(),
+ kafkaRead.getStartReadTime())));
+
+ } else {
+ output =
+ input
+ .getPipeline()
+ .apply(Impulse.create())
+ .apply(ParDo.of(new
GenerateKafkaSourceDescriptor(kafkaRead)));
+ }
+ return
output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
}
- return
output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
}
/**
@@ -1798,10 +1882,6 @@ public class KafkaIO {
@Override
public PCollection<KafkaRecord<K, V>>
expand(PCollection<KafkaSourceDescriptor> input) {
- checkArgument(
- ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(),
"beam_fn_api"),
- "The ReadSourceDescriptors can only used when beam_fn_api is
enabled.");
-
checkArgument(getKeyDeserializerProvider() != null,
"withKeyDeserializer() is required");
checkArgument(getValueDeserializerProvider() != null,
"withValueDeserializer() is required");
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 108d858..bb15e42 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -128,8 +128,10 @@ public class KafkaIOExternalTest {
assertThat(transform.getInputsCount(), Matchers.is(0));
assertThat(transform.getOutputsCount(), Matchers.is(1));
- RunnerApi.PTransform kafkaComposite =
+ RunnerApi.PTransform kafkaReadComposite =
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
+ RunnerApi.PTransform kafkaComposite =
+
result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0));
assertThat(
kafkaComposite.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*Impulse.*")));