This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6695d6462020afd46857c7b50f981d4187c4a802 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Nov 21 11:31:43 2018 +0100 Organise methods in PipelineTranslator --- .../spark/structuredstreaming/SparkRunner.java | 1 - .../translation/PipelineTranslator.java | 64 +++++++++++++--------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java index 3e3b112..ab2215b 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java @@ -99,7 +99,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { PipelineTranslator.replaceTransforms(pipeline, options); PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options); PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator(); - //init pipelineTranslator with subclass based on mode and env pipelineTranslator.translate(pipeline); } private void executePipeline(Pipeline pipeline) {} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java index db5c354..8eb1fb6 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java @@ -21,6 +21,9 @@ import org.slf4j.LoggerFactory; public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{ + // -------------------------------------------------------------------------------------------- + // Pipeline preparation methods + // -------------------------------------------------------------------------------------------- /** * Local configurations work in the same JVM and have no problems with improperly formatted files * on classpath (eg. directories with .class files or empty directories). Prepare files for @@ -49,32 +52,6 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{ } } - /** - * Utility formatting method. - * - * @param n number of spaces to generate - * @return String with "|" followed by n spaces - */ - protected static String genSpaces(int n) { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < n; i++) { - builder.append("| "); - } - return builder.toString(); - } - - /** - * Translates the pipeline by passing this class as a visitor. - * - * @param pipeline The pipeline to be translated - */ - public void translate(Pipeline pipeline) { - pipeline.traverseTopologically(this); - } - - - - /** The translation mode of the Beam Pipeline. */ private enum TranslationMode { @@ -116,4 +93,39 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{ } } + // -------------------------------------------------------------------------------------------- + // Pipeline utility methods + // -------------------------------------------------------------------------------------------- + + /** + * Utility formatting method. + * + * @param n number of spaces to generate + * @return String with "|" followed by n spaces + */ + protected static String genSpaces(int n) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < n; i++) { + builder.append("| "); + } + return builder.toString(); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline visitor methods + // -------------------------------------------------------------------------------------------- + + /** + * Translates the pipeline by passing this class as a visitor. + * + * @param pipeline The pipeline to be translated + */ + public void translate(Pipeline pipeline) { + pipeline.traverseTopologically(this); + } + + + + + }
