This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit abf4b46a8a547a343b240af8ad895f0ec6975423 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Nov 21 09:36:32 2018 +0100 Add global pipeline translation structure --- .../runners/spark/structuredstreaming/SparkRunner.java | 9 ++++----- .../translation/PipelineTranslator.java | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java index 62cd7d3..59c08f7 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java @@ -2,15 +2,14 @@ package org.apache.beam.runners.spark.structuredstreaming; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; -import org.apache.beam.runners.spark.structuredstreaming.translation.BatchPipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.BatchPipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; -import org.apache.beam.runners.spark.structuredstreaming.translation.StreamingPipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,10 +98,10 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { PipelineTranslator.detectTranslationMode(pipeline, options); PipelineTranslator.replaceTransforms(pipeline, options); PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options); - PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator() + PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator(); //init translator with subclass based on mode and env translator.translate(pipeline); } - public void executePipeline(Pipeline pipeline) {} + private void executePipeline(Pipeline pipeline) {} } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java index f0ce1e5..99621f6 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java @@ -3,6 +3,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation; import org.apache.beam.runners.core.construction.PipelineResources; import org.apache.beam.runners.spark.SparkTransformOverrides; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.BatchPipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.values.PCollection; @@ -11,7 +13,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Does all the translation work: mode detection, nodes translation. + /** + * The role of this class is to detect the pipeline mode and to translate the Beam operators to their Spark counterparts. If we have + * a streaming job, this is instantiated as a {@link StreamingPipelineTranslator}. In other + * case, i.e. for a batch job, a {@link BatchPipelineTranslator} is created. Correspondingly, */ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{ @@ -41,11 +46,18 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{ TranslationModeDetector detector = new TranslationModeDetector(); pipeline.traverseTopologically(detector); if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) { - // set streaming mode if it's a streaming pipeline options.setStreaming(true); } } + /** + * Translates the pipeline by passing this class as a visitor. + * + * @param pipeline The pipeline to be translated + */ + public void translate(Pipeline pipeline) { + pipeline.traverseTopologically(this); + }
