This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 051e8dc47ce952606bce2fec93f05fd877b2cdcd Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Nov 21 10:52:43 2018 +0100 Renames: better differenciate pipeline translator for transform translator --- .../spark/structuredstreaming/SparkRunner.java | 6 ++-- .../translation/batch/BatchPipelineTranslator.java | 39 +++++++++++----------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java index 59c08f7..3e3b112 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java @@ -98,9 +98,9 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { PipelineTranslator.detectTranslationMode(pipeline, options); PipelineTranslator.replaceTransforms(pipeline, options); PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options); - PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator(); - //init translator with subclass based on mode and env - translator.translate(pipeline); + PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator(); + //init pipelineTranslator with subclass based on mode and env + pipelineTranslator.translate(pipeline); } private void executePipeline(Pipeline pipeline) {} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java index e20e4c0..2459372 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java @@ -24,39 +24,38 @@ public class BatchPipelineTranslator extends PipelineTranslator { private int depth = 0; @SuppressWarnings("rawtypes") - private static final Map<String, BatchTransformTranslator> - TRANSLATORS = new HashMap<>(); + private static final Map<String, BatchTransformTranslator> TRANSFORM_TRANSLATORS = new HashMap<>(); static { - TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, + TRANSFORM_TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new CombinePerKeyTranslatorBatch()); - TRANSLATORS + TRANSFORM_TRANSLATORS .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch()); - TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch()); + TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch()); - TRANSLATORS + TRANSFORM_TRANSLATORS .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch()); - TRANSLATORS + TRANSFORM_TRANSLATORS .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch()); - TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch()); + TRANSFORM_TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch()); - TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch()); + TRANSFORM_TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch()); } private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class); /** Returns a translator for the given node, if it is possible, otherwise null. */ - private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) { + private static BatchTransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) { @Nullable PTransform<?, ?> transform = node.getTransform(); // Root of the graph is null if (transform == null) { return null; } @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform); - return (urn == null) ? null : TRANSLATORS.get(urn); + return (urn == null) ? null : TRANSFORM_TRANSLATORS.get(urn); } @@ -69,10 +68,10 @@ public class BatchPipelineTranslator extends PipelineTranslator { LOG.info("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName()); depth++; - BatchTransformTranslator<?> translator = getTranslator(node); + BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node); - if (translator != null) { - translateNode(node, translator); + if (transformTranslator != null) { + translateNode(node, transformTranslator); LOG.info("{} translated- {}", genSpaces(depth), node.getFullName()); return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } else { @@ -92,28 +91,28 @@ public class BatchPipelineTranslator extends PipelineTranslator { // get the transformation corresponding to the node we are // currently visiting and translate it into its Spark alternative. - BatchTransformTranslator<?> translator = getTranslator(node); - if (translator == null) { + BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node); + if (transformTranslator == null) { String transformUrn = PTransformTranslation.urnForTransform(node.getTransform()); throw new UnsupportedOperationException( "The transform " + transformUrn + " is currently not supported."); } - translateNode(node, translator); + translateNode(node, transformTranslator); } private <T extends PTransform<?, ?>> void translateNode( TransformHierarchy.Node node, - BatchTransformTranslator<?> translator) { + BatchTransformTranslator<?> transformTranslator) { @SuppressWarnings("unchecked") T typedTransform = (T) node.getTransform(); @SuppressWarnings("unchecked") - BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; + BatchTransformTranslator<T> typedTransformTranslator = (BatchTransformTranslator<T>) transformTranslator; // create the applied PTransform on the translationContext translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); - typedTranslator.translateNode(typedTransform, translationContext); + typedTransformTranslator.translateNode(typedTransform, translationContext); }
