This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 9fcc955 Fix getSideInputs 9fcc955 is described below commit 9fcc955f5722dcc7899f6ec91b9432444a8dd46c Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Tue Feb 19 17:01:04 2019 +0100 Fix getSideInputs --- .../translation/TranslationContext.java | 11 ++++++ .../CreatePCollectionViewTranslatorBatch.java | 40 ++++++++++++++++++++++ .../translation/batch/ParDoTranslatorBatch.java | 1 + .../translation/batch/PipelineTranslatorBatch.java | 4 +++ .../translation/batch/ParDoTest.java | 27 +++++++++++++++ 5 files changed, 83 insertions(+) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 6711b1c..013ef75 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.SparkConf; @@ -61,6 +62,8 @@ public class TranslationContext { @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy private SparkSession sparkSession; + private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets; + public TranslationContext(SparkPipelineOptions options) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(options.getSparkMaster()); @@ -73,6 +76,7 @@ public class TranslationContext { this.serializablePipelineOptions = new SerializablePipelineOptions(options); this.datasets = new HashMap<>(); this.leaves = new HashSet<>(); + this.broadcastDataSets = new HashMap<>(); } public SparkSession getSparkSession() { @@ -128,6 +132,13 @@ public class TranslationContext { } } + public <ViewT, ElemT> void setSideInputDataset( + PCollectionView<ViewT> value, Dataset<WindowedValue<ElemT>> set) { + if (!broadcastDataSets.containsKey(value)) { + broadcastDataSets.put(value, set); + } + } + // -------------------------------------------------------------------------------------------- // PCollections methods // -------------------------------------------------------------------------------------------- diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java new file mode 100644 index 0000000..df4d252 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java @@ -0,0 +1,40 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; +import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.spark.sql.Dataset; + +import java.io.IOException; + +class CreatePCollectionViewTranslatorBatch<ElemT, ViewT> + implements TransformTranslator<PTransform<PCollection<ElemT>, PCollection<ElemT>>> { + + @Override + public void translateTransform( + PTransform<PCollection<ElemT>, PCollection<ElemT>> transform, TranslationContext context) { + + Dataset<WindowedValue<ElemT>> inputDataSet = context.getDataset(context.getInput()); + + @SuppressWarnings("unchecked") AppliedPTransform< + PCollection<ElemT>, PCollection<ElemT>, + PTransform<PCollection<ElemT>, PCollection<ElemT>>> + application = + (AppliedPTransform< + PCollection<ElemT>, PCollection<ElemT>, + PTransform<PCollection<ElemT>, PCollection<ElemT>>>) + context.getCurrentTransform(); + PCollectionView<ViewT> input; + try { + input = CreatePCollectionViewTranslation.getView(application); + } catch (IOException e) { + throw new RuntimeException(e); + } + context.setSideInputDataset(input, inputDataSet); + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 7314298..fa208f3 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -75,6 +75,7 @@ class ParDoTranslatorBatch<InputT, OutputT> // TODO: add support of SideInputs List<PCollectionView<?>> sideInputs = getSideInputs(context); + System.out.println("sideInputs = " + sideInputs); final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0; checkState(!hasSideInputs, "SideInputs are not supported for the moment."); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java index 6715407..8424d43 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java @@ -67,6 +67,10 @@ public class PipelineTranslatorBatch extends PipelineTranslator { TRANSFORM_TRANSLATORS.put( PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch()); + + TRANSFORM_TRANSLATORS.put( + PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, + new CreatePCollectionViewTranslatorBatch()); } public PipelineTranslatorBatch(SparkPipelineOptions options) { diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java index 48350df..c028dc0 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; +import java.util.List; + import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkRunner; import org.apache.beam.sdk.Pipeline; @@ -26,7 +28,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -83,4 +87,27 @@ public class ParDoTest implements Serializable { })); pipeline.run(); } + + @Test + public void testSideInput() { + PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + final PCollectionView<List<Integer>> sideInput = + input.apply(View.asList()); + + input.apply( + ParDo.of( + new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(ProcessContext context) { + List<Integer> list = context.sideInput(sideInput); + + Integer val = context.element(); + context.output(val); + System.out.println("ParDo1: val = " + val + ", sideInput = " + list); + } + }) + .withSideInputs(sideInput)); + + pipeline.run(); + } }