This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 28a9422293fc7390286bea084d2c7c895d2b32b6 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Nov 21 09:36:49 2018 +0100 Add nodes translators structure --- .../translation/BatchPipelineTranslator.java | 20 ------- .../translation/batch/BatchPipelineTranslator.java | 66 ++++++++++++++++++++++ .../batch/BatchTransformTranslator.java | 11 ++++ .../translation/batch/BatchTranslationContext.java | 36 ++++++++++++ .../batch/CombinePerKeyTranslatorBatch.java | 14 +++++ .../batch/FlattenPCollectionTranslatorBatch.java | 13 +++++ .../batch/GroupByKeyTranslatorBatch.java | 14 +++++ .../translation/batch/ParDoTranslatorBatch.java | 13 +++++ .../batch/ReadSourceTranslatorBatch.java | 12 ++++ .../batch/ReshuffleTranslatorBatch.java | 11 ++++ .../batch/WindowAssignTranslatorBatch.java | 12 ++++ .../StreamingPipelineTranslator.java | 6 +- 12 files changed, 206 insertions(+), 22 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java deleted file mode 100644 index e66555c..0000000 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.apache.beam.runners.spark.structuredstreaming.translation; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.values.PValue; - -public class BatchPipelineTranslator extends PipelineTranslator { - - - @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - return super.enterCompositeTransform(node); - } - - - @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - super.visitPrimitiveTransform(node); - } - - -} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java new file mode 100644 index 0000000..2f7ac23 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java @@ -0,0 +1,66 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; + +/** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Spark batch job. */ + +public class BatchPipelineTranslator extends PipelineTranslator { + + + // -------------------------------------------------------------------------------------------- + // Transform Translator Registry + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static final Map<String, BatchTransformTranslator> + TRANSLATORS = new HashMap<>(); + + static { + TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, + new CombinePerKeyTranslatorBatch()); + TRANSLATORS + .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch()); + TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch()); + + TRANSLATORS + .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch()); + + TRANSLATORS + .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch()); + + TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch()); + + TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch()); + } + + /** Returns a translator for the given node, if it is possible, otherwise null. */ + private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) { + @Nullable PTransform<?, ?> transform = node.getTransform(); + // Root of the graph is null + if (transform == null) { + return null; + } + @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform); + return (urn == null) ? null : TRANSLATORS.get(urn); + } + + + @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + return super.enterCompositeTransform(node); + //TODO impl + } + + + @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { + super.visitPrimitiveTransform(node); + //TODO impl + } + + } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java new file mode 100644 index 0000000..ab0cf68 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java @@ -0,0 +1,11 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.PTransform; + +public interface BatchTransformTranslator<TransformT extends PTransform> { + + /** A translator of a {@link PTransform} in batch mode. */ + + void translateNode(TransformT transform, BatchTranslationContext context); + } + diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java new file mode 100644 index 0000000..554beea --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java @@ -0,0 +1,36 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.values.PValue; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; + +/** + * Keeps track of the {@link Dataset} and the step the translation is in. + */ +public class BatchTranslationContext { + private final Map<PValue, Dataset<?>> datasets; + + /** + * For keeping track about which DataSets don't have a successor. We need to terminate these with + * a discarding sink because the Beam model allows dangling operations. + */ + private final Map<PValue, Dataset<?>> danglingDataSets; + + private final SparkSession sparkSession; + private final SparkPipelineOptions options; + + private AppliedPTransform<?, ?, ?> currentTransform; + + + public BatchTranslationContext(SparkSession sparkSession, SparkPipelineOptions options) { + this.sparkSession = sparkSession; + this.options = options; + this.datasets = new HashMap<>(); + this.danglingDataSets = new HashMap<>(); + } + +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java new file mode 100644 index 0000000..6099fbc --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -0,0 +1,14 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> implements BatchTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> { + + @Override public void translateNode( + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform, + BatchTranslationContext context) { + + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java new file mode 100644 index 0000000..281eda9 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java @@ -0,0 +1,13 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; + +class FlattenPCollectionTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> { + + @Override public void translateNode(PTransform<PCollectionList<T>, PCollection<T>> transform, + BatchTranslationContext context) { + + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java new file mode 100644 index 0000000..bb0ccc1 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -0,0 +1,14 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +class GroupByKeyTranslatorBatch<K, InputT> implements BatchTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> { + + @Override public void translateNode( + PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform, + BatchTranslationContext context) { + + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java new file mode 100644 index 0000000..4477853 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -0,0 +1,13 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; + +class ParDoTranslatorBatch<InputT, OutputT> implements BatchTransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> { + + @Override public void translateNode(PTransform<PCollection<InputT>, PCollectionTuple> transform, + BatchTranslationContext context) { + + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java new file mode 100644 index 0000000..a30fa70 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -0,0 +1,12 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; + +class ReadSourceTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PBegin, PCollection<T>>> { + + @Override public void translateNode(PTransform<PBegin, PCollection<T>> transform, BatchTranslationContext context) { + + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java new file mode 100644 index 0000000..6283fdb --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java @@ -0,0 +1,11 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.Reshuffle; + +class ReshuffleTranslatorBatch<K, InputT> implements BatchTransformTranslator<Reshuffle<K, InputT>> { + + @Override public void translateNode(Reshuffle<K, InputT> transform, + BatchTranslationContext context) { + + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java new file mode 100644 index 0000000..21b71b9 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -0,0 +1,12 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; + +class WindowAssignTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> { + + @Override public void translateNode(PTransform<PCollection<T>, PCollection<T>> transform, + BatchTranslationContext context) { + + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java similarity index 53% rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java index 2058b37..547083c 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java @@ -1,5 +1,7 @@ -package org.apache.beam.runners.spark.structuredstreaming.translation; +package org.apache.beam.runners.spark.structuredstreaming.translation.streaming; -public class StreamingPipelineTranslator extends PipelineTranslator { +import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; +public class StreamingPipelineTranslator extends PipelineTranslator { +//TODO impl }
