This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 866ef13df71f5042e0fbd8e33a13ac1e0308d487 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Nov 28 14:52:20 2018 +0100 Add basic pipeline execution. Refactor translatePipeline() to return the translationContext on which we can run startPipeline() --- .../spark/structuredstreaming/SparkRunner.java | 12 +++--- .../translation/PipelineTranslator.java | 4 ++ .../translation/TranslationContext.java | 50 ++++++++++++++++++---- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java index e3fd6b4..8e0cf25 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch; import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator; import org.apache.beam.sdk.Pipeline; @@ -53,6 +54,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { /** Options used in this pipeline runner. */ private final SparkPipelineOptions options; + private TranslationContext translationContext; + /** * Creates and returns a new SparkRunner with default options. In particular, against a spark * instance running in local mode. @@ -109,13 +112,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public SparkPipelineResult run(final Pipeline pipeline) { - translatePipeline(pipeline); + translationContext = translatePipeline(pipeline); //TODO initialise other services: checkpointing, metrics system, listeners, ... - executePipeline(pipeline); + translationContext.startPipeline(); return new SparkPipelineResult(); } - private void translatePipeline(Pipeline pipeline) { + private TranslationContext translatePipeline(Pipeline pipeline) { PipelineTranslator.detectTranslationMode(pipeline, options); PipelineTranslator.replaceTransforms(pipeline, options); PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options); @@ -124,7 +127,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { ? new StreamingPipelineTranslator(options) : new PipelineTranslatorBatch(options); pipelineTranslator.translate(pipeline); + return pipelineTranslator.getTranslationContext(); } - - private void executePipeline(Pipeline pipeline) {} } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java index d64b8b1..e0924e3 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java @@ -210,4 +210,8 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul } applyTransformTranslator(node, transformTranslator); } + + public TranslationContext getTranslationContext() { + return translationContext; + } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index aa831ed..71ae276 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -19,14 +19,18 @@ package org.apache.beam.runners.spark.structuredstreaming.translation; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Set; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQueryException; /** * Base class that gives a context for {@link PTransform} translation: keeping track of the @@ -34,20 +38,16 @@ import org.apache.spark.sql.SparkSession; */ public class TranslationContext { + private final Map<PValue, Dataset<?>> datasets; + private final Set<Dataset<?>> leaves; + private final SparkPipelineOptions options; + @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy private AppliedPTransform<?, ?, ?> currentTransform; - private final Map<PValue, Dataset<?>> datasets; - @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy private SparkSession sparkSession; - private final SparkPipelineOptions options; - - public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { - this.currentTransform = currentTransform; - } - public TranslationContext(SparkPipelineOptions options) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(options.getSparkMaster()); @@ -59,5 +59,39 @@ public class TranslationContext { this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); this.options = options; this.datasets = new HashMap<>(); + this.leaves = new LinkedHashSet<>(); + } + + public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { + this.currentTransform = currentTransform; + } + + public void startPipeline() { + try { + // to start a pipeline we need a DatastreamWriter to start + for (Dataset<?> dataset : leaves) { + dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination(); + } + } catch (StreamingQueryException e) { + throw new RuntimeException("Pipeline execution failed: " + e); + } + } + + private static class NoOpForeachWriter<T> extends ForeachWriter<T> { + + @Override + public boolean open(long partitionId, long epochId) { + return false; + } + + @Override + public void process(T value) { + // do nothing + } + + @Override + public void close(Throwable errorOrNull) { + // do nothing + } } }
