This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 286d7f36480d79ad54f2e92f0b8af8c4ba716621 Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Nov 29 16:02:11 2018 +0100 Add Flatten transformation translator --- .../translation/TranslationContext.java | 4 +++ ...latorBatch.java => FlattenTranslatorBatch.java} | 35 ++++++++++++++++++++-- .../translation/batch/PipelineTranslatorBatch.java | 2 +- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 98f77af..3c29867 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -83,6 +83,10 @@ public class TranslationContext { // -------------------------------------------------------------------------------------------- // Datasets methods // -------------------------------------------------------------------------------------------- + @SuppressWarnings("unchecked") + public <T> Dataset<T> emptyDataset() { + return (Dataset<T>) sparkSession.emptyDataset(Encoders.bean(Void.class)); + } @SuppressWarnings("unchecked") public <T> Dataset<WindowedValue<T>> getDataset(PValue value) { diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java similarity index 55% rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java index 87a250e..2739e83 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java @@ -17,16 +17,47 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Map; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.spark.sql.Dataset; -class FlattenPCollectionTranslatorBatch<T> +class FlattenTranslatorBatch<T> implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> { @Override public void translateTransform( - PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {} + PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) { + Map<TupleTag<?>, PValue> inputs = context.getInputs(); + Dataset<WindowedValue<T>> result = null; + + if (inputs.isEmpty()) { + result = context.emptyDataset(); + } else { + for (PValue pValue : inputs.values()) { + checkArgument( + pValue instanceof PCollection, + "Got non-PCollection input to flatten: %s of type %s", + pValue, + pValue.getClass().getSimpleName()); + @SuppressWarnings("unchecked") + PCollection<T> pCollection = (PCollection<T>) pValue; + Dataset<WindowedValue<T>> current = context.getDataset(pCollection); + if (result == null) { + result = current; + } else { + result = result.union(current); + } + } + } + context.putDataset(context.getOutput(), result); + } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java index 318d74c..26f1b9c 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java @@ -56,7 +56,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator { TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch()); TRANSFORM_TRANSLATORS.put( - PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch()); + PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenTranslatorBatch()); TRANSFORM_TRANSLATORS.put( PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());
