[
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288547#comment-16288547
]
ASF GitHub Bot commented on BEAM-2421:
--------------------------------------
lukecwik closed pull request #4234: [BEAM-2421] Replaces BoundedSource with a
composite transform when using Fn API
URL: https://github.com/apache/beam/pull/4234
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a6500924149..ddad43fe6ec 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -91,6 +92,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
@@ -132,6 +134,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
@@ -420,6 +423,13 @@ public static DataflowRunner fromOptions(PipelineOptions
options) {
new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsIterable.class, this)));
}
+ // Expands into Reshuffle and single-output ParDo, so has to be before the
overrides below.
+ if (hasExperiment(options, "beam_fn_api")) {
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Bounded.class),
+ new FnApiBoundedReadOverrideFactory()));
+ }
overridesBuilder
.add(
PTransformOverride.of(
@@ -1185,7 +1195,7 @@ private StreamingFnApiCreate(
public final PCollection<T> expand(PBegin input) {
try {
PCollection<T> pc = Pipeline
- .applyTransform(input, new Impulse(IsBounded.BOUNDED))
+ .applyTransform(input, new Impulse())
.apply(ParDo.of(DecodeAndEmitDoFn
.fromIterable(transform.getElements(),
originalOutput.getCoder())));
pc.setCoder(originalOutput.getCoder());
@@ -1206,7 +1216,7 @@ private StreamingFnApiCreate(
throws IOException {
ImmutableList.Builder<byte[]> allElementsBytes =
ImmutableList.builder();
for (T element : elements) {
- byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+ byte[] bytes = encodeToByteArray(elemCoder, element);
allElementsBytes.add(bytes);
}
return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
@@ -1244,16 +1254,16 @@ public void processElement(ProcessContext context)
throws IOException {
/** The Dataflow specific override for the impulse primitive. */
private static class Impulse extends PTransform<PBegin, PCollection<byte[]>>
{
- private final IsBounded isBounded;
-
- private Impulse(IsBounded isBounded) {
- this.isBounded = isBounded;
+ private Impulse() {
}
@Override
public PCollection<byte[]> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), isBounded,
ByteArrayCoder.of());
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ ByteArrayCoder.of());
}
private static class Translator implements TransformTranslator<Impulse> {
@@ -1265,8 +1275,21 @@ public void translate(Impulse transform,
TranslationContext context) {
stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION,
"_starting_signal/");
stepContext.addOutput(context.getOutput(transform));
} else {
- throw new UnsupportedOperationException(
- "Impulse source for batch pipelines has not been defined.");
+ StepTranslationContext stepContext = context.addStep(transform,
"ParallelRead");
+ stepContext.addInput(PropertyNames.FORMAT, "impulse");
+ WindowedValue.FullWindowedValueCoder<byte[]> coder =
+ WindowedValue.getFullCoder(
+ context.getOutput(transform).getCoder(),
GlobalWindow.Coder.INSTANCE);
+ byte[] encodedImpulse;
+ try {
+ encodedImpulse =
+ encodeToByteArray(coder, WindowedValue.valueInGlobalWindow(new
byte[0]));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ stepContext.addInput(
+ PropertyNames.IMPULSE_ELEMENT,
byteArrayToJsonString(encodedImpulse));
+ stepContext.addOutput(context.getOutput(transform));
}
}
}
@@ -1438,6 +1461,65 @@ public StreamingBoundedRead(Read.Bounded<T> transform) {
}
}
+ private static class FnApiBoundedReadOverrideFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>,
Read.Bounded<T>> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>>
getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new
FnApiBoundedRead<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ /**
+ * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded
Read.Bounded} for the
+ * Dataflow runner in streaming mode.
+ */
+ private static class FnApiBoundedRead<T> extends PTransform<PBegin,
PCollection<T>> {
+ private final BoundedSource<T> source;
+
+ public FnApiBoundedRead(Read.Bounded<T> transform) {
+ this.source = transform.getSource();
+ }
+
+ @Override
+ public final PCollection<T> expand(PBegin input) {
+ return input
+ .apply(new Impulse())
+ .apply(
+ ParDo.of(
+ new DoFn<byte[], BoundedSource<T>>() {
+ @ProcessElement
+ public void process(ProcessContext c) throws Exception {
+ for (BoundedSource<T> split :
+ source.split(64L << 20, c.getPipelineOptions())) {
+ c.output(split);
+ }
+ }
+ }))
+ .setCoder((Coder<BoundedSource<T>>) SerializableCoder.of((Class)
BoundedSource.class))
+ .apply(Reshuffle.<BoundedSource<T>>viaRandomKey())
+ .apply(
+ ParDo.of(
+ new DoFn<BoundedSource<T>, T>() {
+ @ProcessElement
+ public void process(ProcessContext c) throws Exception {
+ BoundedSource.BoundedReader<T> reader =
+ c.element().createReader(c.getPipelineOptions());
+ for (boolean more = reader.start(); more; more =
reader.advance()) {
+ c.outputWithTimestamp(reader.getCurrent(),
reader.getCurrentTimestamp());
+ }
+ }
+ }))
+ .setCoder(source.getOutputCoder());
+ }
+ }
/**
* A marker {@link DoFn} for writing the contents of a {@link PCollection}
to a streaming
* {@link PCollectionView} backend implementation.
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index 55e0c4ebff9..cdc87bf9343 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -64,4 +64,5 @@
public static final String VALUE = "value";
public static final String DISPLAY_DATA = "display_data";
public static final String RESTRICTION_CODER = "restriction_coder";
+ public static final String IMPULSE_ELEMENT = "impulse_element";
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Migrate Apache Beam to use impulse primitive as the only root primitive
> -----------------------------------------------------------------------
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
> Issue Type: Improvement
> Components: beam-model
> Reporter: Luke Cwik
> Assignee: Eugene Kirpichov
> Labels: portability
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)