This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 286d7f36480d79ad54f2e92f0b8af8c4ba716621
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Nov 29 16:02:11 2018 +0100

    Add Flatten transformation translator
---
 .../translation/TranslationContext.java            |  4 +++
 ...latorBatch.java => FlattenTranslatorBatch.java} | 35 ++++++++++++++++++++--
 .../translation/batch/PipelineTranslatorBatch.java |  2 +-
 3 files changed, 38 insertions(+), 3 deletions(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 98f77af..3c29867 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -83,6 +83,10 @@ public class TranslationContext {
   // 
--------------------------------------------------------------------------------------------
   //  Datasets methods
   // 
--------------------------------------------------------------------------------------------
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<T> emptyDataset() {
+    return (Dataset<T>) sparkSession.emptyDataset(Encoders.bean(Void.class));
+  }
 
   @SuppressWarnings("unchecked")
   public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
similarity index 55%
rename from 
runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
rename to 
runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
index 87a250e..2739e83 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
@@ -17,16 +17,47 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.sql.Dataset;
 
-class FlattenPCollectionTranslatorBatch<T>
+class FlattenTranslatorBatch<T>
     implements TransformTranslator<PTransform<PCollectionList<T>, 
PCollection<T>>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollectionList<T>, PCollection<T>> transform, 
TranslationContext context) {}
+      PTransform<PCollectionList<T>, PCollection<T>> transform, 
TranslationContext context) {
+    Map<TupleTag<?>, PValue> inputs = context.getInputs();
+    Dataset<WindowedValue<T>> result = null;
+
+    if (inputs.isEmpty()) {
+      result = context.emptyDataset();
+    } else {
+      for (PValue pValue : inputs.values()) {
+        checkArgument(
+            pValue instanceof PCollection,
+            "Got non-PCollection input to flatten: %s of type %s",
+            pValue,
+            pValue.getClass().getSimpleName());
+        @SuppressWarnings("unchecked")
+        PCollection<T> pCollection = (PCollection<T>) pValue;
+        Dataset<WindowedValue<T>> current = context.getDataset(pCollection);
+        if (result == null) {
+          result = current;
+        } else {
+          result = result.union(current);
+        }
+      }
+    }
+    context.putDataset(context.getOutput(), result);
+  }
 }
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 318d74c..26f1b9c 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -56,7 +56,7 @@ public class PipelineTranslatorBatch extends 
PipelineTranslator {
     TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new 
ReshuffleTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
-        PTransformTranslation.FLATTEN_TRANSFORM_URN, new 
FlattenPCollectionTranslatorBatch());
+        PTransformTranslation.FLATTEN_TRANSFORM_URN, new 
FlattenTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
         PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new 
WindowAssignTranslatorBatch());

Reply via email to