This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7fb8f7b Replaces BoundedSource with a composite transform when using
Fn API
7fb8f7b is described below
commit 7fb8f7b6b3280ff991bd90e39006e72e4126697e
Author: Eugene Kirpichov <[email protected]>
AuthorDate: Mon Oct 9 11:30:27 2017 -0700
Replaces BoundedSource with a composite transform when using Fn API
---
.../beam/runners/dataflow/DataflowRunner.java | 100 +++++++++++++++++++--
.../beam/runners/dataflow/util/PropertyNames.java | 1 +
2 files changed, 92 insertions(+), 9 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a650092..ddad43f 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,6 +21,7 @@ import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -91,6 +92,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
@@ -132,6 +134,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
@@ -420,6 +423,13 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsIterable.class, this)));
}
+ // Expands into Reshuffle and single-output ParDo, so has to be before the
overrides below.
+ if (hasExperiment(options, "beam_fn_api")) {
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Bounded.class),
+ new FnApiBoundedReadOverrideFactory()));
+ }
overridesBuilder
.add(
PTransformOverride.of(
@@ -1185,7 +1195,7 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
public final PCollection<T> expand(PBegin input) {
try {
PCollection<T> pc = Pipeline
- .applyTransform(input, new Impulse(IsBounded.BOUNDED))
+ .applyTransform(input, new Impulse())
.apply(ParDo.of(DecodeAndEmitDoFn
.fromIterable(transform.getElements(),
originalOutput.getCoder())));
pc.setCoder(originalOutput.getCoder());
@@ -1206,7 +1216,7 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
throws IOException {
ImmutableList.Builder<byte[]> allElementsBytes =
ImmutableList.builder();
for (T element : elements) {
- byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+ byte[] bytes = encodeToByteArray(elemCoder, element);
allElementsBytes.add(bytes);
}
return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
@@ -1244,16 +1254,16 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
/** The Dataflow specific override for the impulse primitive. */
private static class Impulse extends PTransform<PBegin, PCollection<byte[]>>
{
- private final IsBounded isBounded;
-
- private Impulse(IsBounded isBounded) {
- this.isBounded = isBounded;
+ private Impulse() {
}
@Override
public PCollection<byte[]> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), isBounded,
ByteArrayCoder.of());
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ ByteArrayCoder.of());
}
private static class Translator implements TransformTranslator<Impulse> {
@@ -1265,8 +1275,21 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION,
"_starting_signal/");
stepContext.addOutput(context.getOutput(transform));
} else {
- throw new UnsupportedOperationException(
- "Impulse source for batch pipelines has not been defined.");
+ StepTranslationContext stepContext = context.addStep(transform,
"ParallelRead");
+ stepContext.addInput(PropertyNames.FORMAT, "impulse");
+ WindowedValue.FullWindowedValueCoder<byte[]> coder =
+ WindowedValue.getFullCoder(
+ context.getOutput(transform).getCoder(),
GlobalWindow.Coder.INSTANCE);
+ byte[] encodedImpulse;
+ try {
+ encodedImpulse =
+ encodeToByteArray(coder, WindowedValue.valueInGlobalWindow(new
byte[0]));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ stepContext.addInput(
+ PropertyNames.IMPULSE_ELEMENT,
byteArrayToJsonString(encodedImpulse));
+ stepContext.addOutput(context.getOutput(transform));
}
}
}
@@ -1438,6 +1461,65 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
}
}
+ private static class FnApiBoundedReadOverrideFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>,
Read.Bounded<T>> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>>
getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new
FnApiBoundedRead<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ /**
+ * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded
Read.Bounded} for the
+ * Dataflow runner in streaming mode.
+ */
+ private static class FnApiBoundedRead<T> extends PTransform<PBegin,
PCollection<T>> {
+ private final BoundedSource<T> source;
+
+ public FnApiBoundedRead(Read.Bounded<T> transform) {
+ this.source = transform.getSource();
+ }
+
+ @Override
+ public final PCollection<T> expand(PBegin input) {
+ return input
+ .apply(new Impulse())
+ .apply(
+ ParDo.of(
+ new DoFn<byte[], BoundedSource<T>>() {
+ @ProcessElement
+ public void process(ProcessContext c) throws Exception {
+ for (BoundedSource<T> split :
+ source.split(64L << 20, c.getPipelineOptions())) {
+ c.output(split);
+ }
+ }
+ }))
+ .setCoder((Coder<BoundedSource<T>>) SerializableCoder.of((Class)
BoundedSource.class))
+ .apply(Reshuffle.<BoundedSource<T>>viaRandomKey())
+ .apply(
+ ParDo.of(
+ new DoFn<BoundedSource<T>, T>() {
+ @ProcessElement
+ public void process(ProcessContext c) throws Exception {
+ BoundedSource.BoundedReader<T> reader =
+ c.element().createReader(c.getPipelineOptions());
+ for (boolean more = reader.start(); more; more =
reader.advance()) {
+ c.outputWithTimestamp(reader.getCurrent(),
reader.getCurrentTimestamp());
+ }
+ }
+ }))
+ .setCoder(source.getOutputCoder());
+ }
+ }
/**
* A marker {@link DoFn} for writing the contents of a {@link PCollection}
to a streaming
* {@link PCollectionView} backend implementation.
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index 55e0c4e..cdc87bf 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -64,4 +64,5 @@ public class PropertyNames {
public static final String VALUE = "value";
public static final String DISPLAY_DATA = "display_data";
public static final String RESTRICTION_CODER = "restriction_coder";
+ public static final String IMPULSE_ELEMENT = "impulse_element";
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].