This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit ec9d63462703ebe9fdac170c8ffa1aeb1d14972c Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Nov 21 12:13:21 2018 +0100 Initialise BatchTranslationContext --- .../runners/spark/structuredstreaming/SparkRunner.java | 2 +- .../translation/batch/BatchPipelineTranslator.java | 7 ++++++- .../translation/batch/BatchTranslationContext.java | 17 ++++++++++++++--- .../streaming/StreamingPipelineTranslator.java | 5 ++++- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java index ab2215b..de20133 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java @@ -98,7 +98,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { PipelineTranslator.detectTranslationMode(pipeline, options); PipelineTranslator.replaceTransforms(pipeline, options); PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options); - PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator(); + PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator(options) : new BatchPipelineTranslator(options); pipelineTranslator.translate(pipeline); } private void executePipeline(Pipeline pipeline) {} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java index 2459372..1bf660f 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java @@ -4,10 +4,13 @@ import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +48,9 @@ public class BatchPipelineTranslator extends PipelineTranslator { } private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class); - + public BatchPipelineTranslator(SparkPipelineOptions options) { + translationContext = new BatchTranslationContext(options); + } /** Returns a translator for the given node, if it is possible, otherwise null. */ private static BatchTransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) { diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java index 1d991f1..b53aa19 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java @@ -5,6 +5,7 @@ import java.util.Map; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.values.PValue; +import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; @@ -20,14 +21,24 @@ public class BatchTranslationContext { */ private final Map<PValue, Dataset<?>> danglingDataSets; - private final SparkSession sparkSession; + private SparkSession sparkSession; private final SparkPipelineOptions options; private AppliedPTransform<?, ?, ?> currentTransform; - public BatchTranslationContext(SparkSession sparkSession, SparkPipelineOptions options) { - this.sparkSession = sparkSession; + public BatchTranslationContext(SparkPipelineOptions options) { + SparkConf sparkConf = new SparkConf(); + sparkConf.setMaster(options.getSparkMaster()); + sparkConf.setAppName(options.getAppName()); + if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) { + sparkConf.setJars(options.getFilesToStage().toArray(new String[0])); + } + + SparkSession sparkSession = SparkSession + .builder() + .config(sparkConf) + .getOrCreate(); this.options = options; this.datasets = new HashMap<>(); this.danglingDataSets = new HashMap<>(); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java index 547083c..7bed930 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java @@ -1,7 +1,10 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.streaming; +import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; public class StreamingPipelineTranslator extends PipelineTranslator { -//TODO impl + + public StreamingPipelineTranslator(SparkPipelineOptions options) { + } }
