This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 9fcc955  Fix getSideInputs
9fcc955 is described below

commit 9fcc955f5722dcc7899f6ec91b9432444a8dd46c
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Tue Feb 19 17:01:04 2019 +0100

    Fix getSideInputs
---
 .../translation/TranslationContext.java            | 11 ++++++
 .../CreatePCollectionViewTranslatorBatch.java      | 40 ++++++++++++++++++++++
 .../translation/batch/ParDoTranslatorBatch.java    |  1 +
 .../translation/batch/PipelineTranslatorBatch.java |  4 +++
 .../translation/batch/ParDoTest.java               | 27 +++++++++++++++
 5 files changed, 83 insertions(+)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 6711b1c..013ef75 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.SparkConf;
@@ -61,6 +62,8 @@ public class TranslationContext {
   @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
   private SparkSession sparkSession;
 
+  private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets;
+
   public TranslationContext(SparkPipelineOptions options) {
     SparkConf sparkConf = new SparkConf();
     sparkConf.setMaster(options.getSparkMaster());
@@ -73,6 +76,7 @@ public class TranslationContext {
     this.serializablePipelineOptions = new 
SerializablePipelineOptions(options);
     this.datasets = new HashMap<>();
     this.leaves = new HashSet<>();
+    this.broadcastDataSets = new HashMap<>();
   }
 
   public SparkSession getSparkSession() {
@@ -128,6 +132,13 @@ public class TranslationContext {
     }
   }
 
+  public <ViewT, ElemT> void setSideInputDataset(
+      PCollectionView<ViewT> value, Dataset<WindowedValue<ElemT>> set) {
+    if (!broadcastDataSets.containsKey(value)) {
+      broadcastDataSets.put(value, set);
+    }
+  }
+
   // 
--------------------------------------------------------------------------------------------
   //  PCollections methods
   // 
--------------------------------------------------------------------------------------------
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java
new file mode 100644
index 0000000..df4d252
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java
@@ -0,0 +1,40 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import 
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.spark.sql.Dataset;
+
+import java.io.IOException;
+
+class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
+    implements TransformTranslator<PTransform<PCollection<ElemT>, 
PCollection<ElemT>>> {
+
+  @Override
+  public void translateTransform(
+      PTransform<PCollection<ElemT>, PCollection<ElemT>> transform, 
TranslationContext context) {
+
+    Dataset<WindowedValue<ElemT>> inputDataSet = 
context.getDataset(context.getInput());
+
+    @SuppressWarnings("unchecked") AppliedPTransform<
+        PCollection<ElemT>, PCollection<ElemT>,
+        PTransform<PCollection<ElemT>, PCollection<ElemT>>>
+        application =
+        (AppliedPTransform<
+            PCollection<ElemT>, PCollection<ElemT>,
+            PTransform<PCollection<ElemT>, PCollection<ElemT>>>)
+            context.getCurrentTransform();
+    PCollectionView<ViewT> input;
+    try {
+      input = CreatePCollectionViewTranslation.getView(application);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    context.setSideInputDataset(input, inputDataSet);
+  }
+}
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 7314298..fa208f3 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -75,6 +75,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
 
     // TODO: add support of SideInputs
     List<PCollectionView<?>> sideInputs = getSideInputs(context);
+    System.out.println("sideInputs = " + sideInputs);
     final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0;
     checkState(!hasSideInputs, "SideInputs are not supported for the moment.");
 
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 6715407..8424d43 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -67,6 +67,10 @@ public class PipelineTranslatorBatch extends 
PipelineTranslator {
 
     TRANSFORM_TRANSLATORS.put(
         PTransformTranslation.READ_TRANSFORM_URN, new 
ReadSourceTranslatorBatch());
+
+    TRANSFORM_TRANSLATORS.put(
+        PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
+        new CreatePCollectionViewTranslatorBatch());
   }
 
   public PipelineTranslatorBatch(SparkPipelineOptions options) {
diff --git 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
index 48350df..c028dc0 100644
--- 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
+++ 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
+import java.util.List;
+
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
@@ -26,7 +28,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -83,4 +87,27 @@ public class ParDoTest implements Serializable {
                 }));
     pipeline.run();
   }
+
+  @Test
+  public void testSideInput() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 
8, 9, 10));
+    final PCollectionView<List<Integer>> sideInput =
+        input.apply(View.asList());
+
+    input.apply(
+        ParDo.of(
+            new DoFn<Integer, Integer>() {
+              @ProcessElement
+              public void processElement(ProcessContext context) {
+                List<Integer> list = context.sideInput(sideInput);
+
+                Integer val = context.element();
+                context.output(val);
+                System.out.println("ParDo1: val = " + val + ", sideInput = " + 
list);
+              }
+            })
+            .withSideInputs(sideInput));
+
+    pipeline.run();
+  }
 }

Reply via email to