This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7a645e1bd44f95cae5108c63d5f49f555c91f7d6 Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Nov 29 11:48:20 2018 +0100 Create PCollections manipulation methods --- .../translation/TranslationContext.java | 56 +++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 71ae276..a3276bf 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -17,17 +17,25 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation; +import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.HashMap; -import java.util.LinkedHashSet; +import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.ForeachWriter; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQueryException; @@ -40,6 +48,7 @@ public class TranslationContext { private final Map<PValue, Dataset<?>> datasets; private final Set<Dataset<?>> leaves; + private final SparkPipelineOptions options; @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy @@ -62,10 +71,55 @@ public class TranslationContext { this.leaves = new LinkedHashSet<>(); } + // -------------------------------------------------------------------------------------------- + // Transforms methods + // -------------------------------------------------------------------------------------------- public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { this.currentTransform = currentTransform; } + // -------------------------------------------------------------------------------------------- + // Datasets methods + // -------------------------------------------------------------------------------------------- + + + // -------------------------------------------------------------------------------------------- + // PCollections methods + // -------------------------------------------------------------------------------------------- + @SuppressWarnings("unchecked") + public PValue getInput() { + return Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + } + + @SuppressWarnings("unchecked") + public Map<TupleTag<?>, PValue> getInputs() { + return currentTransform.getInputs(); + } + + @SuppressWarnings("unchecked") + public PValue getOutput() { + return Iterables.getOnlyElement(currentTransform.getOutputs().values()); + } + + @SuppressWarnings("unchecked") + public Map<TupleTag<?>, PValue> getOutputs() { + return currentTransform.getOutputs(); + } + + @SuppressWarnings("unchecked") + public Map<TupleTag<?>, Coder<?>> getOutputCoders() { + return currentTransform + .getOutputs() + .entrySet() + .stream() + .filter(e -> e.getValue() instanceof PCollection) + .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline methods + // -------------------------------------------------------------------------------------------- + public void startPipeline() { try { // to start a pipeline we need a DatastreamWriter to start
