mosche commented on code in PR #24009:
URL: https://github.com/apache/beam/pull/24009#discussion_r1025008863
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -17,170 +17,336 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation;
+import static
org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+import static
org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
+
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
+import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderProvider;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * {@link Pipeline.PipelineVisitor} that translates the Beam operators to
their Spark counterparts.
- * It also does the pipeline preparation: mode detection, transforms
replacement, classpath
- * preparation.
+ * The pipeline translator translates a Beam {@link Pipeline} into a Spark
correspondence, that can
+ * then be evaluated.
+ *
+ * <p>The translation involves traversing the hierarchy of a pipeline multiple
times:
+ *
+ * <ol>
+ * <li>Detect if {@link StreamingOptions#setStreaming streaming} mode is
required.
+ * <li>Identify datasets that are repeatedly used as input and should be
cached.
+ * <li>And finally, translate each primitive or composite {@link PTransform}
that is {@link
+ * #getTransformTranslator known} and {@link
TransformTranslator#canTranslate supported} into
+ * its Spark correspondence. If a composite is not supported, it will be
expanded further into
+ * its parts and translated then.
+ * </ol>
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-public abstract class PipelineTranslator extends
Pipeline.PipelineVisitor.Defaults {
+@Internal
+public abstract class PipelineTranslator {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineTranslator.class);
- protected TranslationContext translationContext;
- //
--------------------------------------------------------------------------------------------
- // Pipeline preparation methods
- //
--------------------------------------------------------------------------------------------
public static void replaceTransforms(Pipeline pipeline, StreamingOptions
options) {
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(options.isStreaming()));
}
/**
- * Visit the pipeline to determine the translation mode (batch/streaming)
and update options
- * accordingly.
+ * Analyse the pipeline to determine if we have to switch to streaming mode
for the pipeline
+ * translation and update {@link StreamingOptions} accordingly.
*/
- public static void detectTranslationMode(Pipeline pipeline, StreamingOptions
options) {
- TranslationModeDetector detector = new TranslationModeDetector();
+ public static void detectStreamingMode(Pipeline pipeline, StreamingOptions
options) {
+ StreamingModeDetector detector = new
StreamingModeDetector(options.isStreaming());
pipeline.traverseTopologically(detector);
- if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
- options.setStreaming(true);
+ options.setStreaming(detector.streaming);
+ }
+
+ /** Returns a {@link TransformTranslator} for the given {@link PTransform}
if known. */
+ protected abstract @Nullable <
+ InT extends PInput, OutT extends POutput, TransformT extends
PTransform<InT, OutT>>
+ TransformTranslator<InT, OutT, TransformT>
getTransformTranslator(TransformT transform);
+
+ /**
+ * Translates a Beam pipeline into its Spark correspondence using the Spark
SQL / Dataset API.
+ *
+ * <p>Note, in some cases this involves the early evaluation of some parts
of the pipeline. For
+ * example, in order to use a side-input {@link
org.apache.beam.sdk.values.PCollectionView
+ * PCollectionView} in a translation the corresponding Spark {@link
+ * org.apache.beam.runners.spark.translation.Dataset Dataset} might have to
be collected and
+ * broadcasted to be able to continue with the translation.
+ *
+ * @return The result of the translation is an {@link EvaluationContext}
that can trigger the
+ * evaluation of the Spark pipeline.
+ */
+ public EvaluationContext translate(
+ Pipeline pipeline, SparkSession session, SparkCommonPipelineOptions
options) {
+ LOG.debug("starting translation of the pipeline using {}",
getClass().getName());
+ DependencyVisitor dependencies = new DependencyVisitor();
+ pipeline.traverseTopologically(dependencies);
+
+ TranslatingVisitor translator = new TranslatingVisitor(session, options,
dependencies.results);
+ pipeline.traverseTopologically(translator);
+
+ return new EvaluationContext(translator.leaves, session);
+ }
+
+ /**
+ * The correspondence of a {@link PCollection} as result of translating a
{@link PTransform}
+ * including additional metadata (such as name and dependents).
+ */
+ private static final class TranslationResult<T> implements
EvaluationContext.NamedDataset<T> {
+ private final String name;
+ private @Nullable Dataset<WindowedValue<T>> dataset = null;
+ private final Set<PTransform<?, ?>> dependentTransforms = new HashSet<>();
+
+ private TranslationResult(PCollection<?> pCol) {
+ this.name = pCol.getName();
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public @Nullable Dataset<WindowedValue<T>> dataset() {
+ return dataset;
}
}
- /** The translation mode of the Beam Pipeline. */
- private enum TranslationMode {
+ /** Shared, mutable state during the translation of a pipeline and omitted
afterwards. */
+ interface TranslationState extends EncoderProvider {
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]