This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1c977888ed4a3572a7af8d476fbcaa48cc36e5b4 Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Nov 16 19:06:04 2018 +0100 Start pipeline translation --- .../structuredstreaming/SparkPipelineResult.java | 29 +++ .../spark/structuredstreaming/SparkRunner.java | 108 +++++++++ .../translation/BatchPipelineTranslator.java | 20 ++ .../translation/EvaluationContext.java | 261 --------------------- .../translation/PipelineTranslator.java | 94 ++++++++ .../translation/SparkTransformOverrides.java | 52 ++++ .../translation/StreamingPipelineTranslator.java | 5 + 7 files changed, 308 insertions(+), 261 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java new file mode 100644 index 0000000..82d1b90 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java @@ -0,0 +1,29 @@ +package org.apache.beam.runners.spark.structuredstreaming; + +import java.io.IOException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.joda.time.Duration; + +public class SparkPipelineResult implements PipelineResult { + + @Override public State getState() { + return null; + } + + @Override public State cancel() throws IOException { + return null; + } + + @Override public State waitUntilFinish(Duration duration) { + return null; + } + + @Override public State waitUntilFinish() { + return null; + } + + @Override public MetricResults metrics() { + return null; + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java new file mode 100644 index 0000000..62cd7d3 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java @@ -0,0 +1,108 @@ +package org.apache.beam.runners.spark.structuredstreaming; + +import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; + +import org.apache.beam.runners.spark.structuredstreaming.translation.BatchPipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.StreamingPipelineTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The SparkRunner translate operations defined on a pipeline to a representation executable by + * Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam pipeline + * with the default options of a single threaded spark instance in local mode, we would do the + * following: + * + * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result = + * (SparkPipelineResult) p.run(); } + * + * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url + * we would do the following: + * + * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options = + * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port"); + * SparkPipelineResult result = (SparkPipelineResult) p.run(); } + */ +public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { + + private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class); + + /** Options used in this pipeline runner. */ + private final SparkPipelineOptions options; + + /** + * Creates and returns a new SparkRunner with default options. In particular, against a spark + * instance running in local mode. + * + * @return A pipeline runner with default options. + */ + public static SparkRunner create() { + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + return new SparkRunner(options); + } + + /** + * Creates and returns a new SparkRunner with specified options. + * + * @param options The SparkPipelineOptions to use when executing the job. + * @return A pipeline runner that will execute with specified options. + */ + public static SparkRunner create(SparkPipelineOptions options) { + return new SparkRunner(options); + } + + /** + * Creates and returns a new SparkRunner with specified options. + * + * @param options The PipelineOptions to use when executing the job. + * @return A pipeline runner that will execute with specified options. + */ + public static SparkRunner fromOptions(PipelineOptions options) { + SparkPipelineOptions sparkOptions = PipelineOptionsValidator + .validate(SparkPipelineOptions.class, options); + + if (sparkOptions.getFilesToStage() == null) { + sparkOptions.setFilesToStage(detectClassPathResourcesToStage(SparkRunner.class.getClassLoader())); + LOG.info("PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged.", + sparkOptions.getFilesToStage().size()); + LOG.debug("Classpath elements: {}", sparkOptions.getFilesToStage()); + } + + return new SparkRunner(sparkOptions); + } + + /** + * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single + * thread. + */ + private SparkRunner(SparkPipelineOptions options) { + this.options = options; + } + + @Override public SparkPipelineResult run(final Pipeline pipeline) { + translatePipeline(pipeline); + executePipeline(pipeline); + return new SparkPipelineResult(); + } + + private void translatePipeline(Pipeline pipeline){ + PipelineTranslator.detectTranslationMode(pipeline, options); + PipelineTranslator.replaceTransforms(pipeline, options); + PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options); + PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator() + //init translator with subclass based on mode and env + translator.translate(pipeline); + } + public void executePipeline(Pipeline pipeline) {} + +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java new file mode 100644 index 0000000..e66555c --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java @@ -0,0 +1,20 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PValue; + +public class BatchPipelineTranslator extends PipelineTranslator { + + + @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + return super.enterCompositeTransform(node); + } + + + @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { + super.visitPrimitiveTransform(node); + } + + +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java deleted file mode 100644 index 47a3098..0000000 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.structuredstreaming.translation; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Iterables; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.beam.runners.core.construction.SerializablePipelineOptions; -import org.apache.beam.runners.core.construction.TransformInputs; -import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -/** - * The EvaluationContext allows us to define pipeline instructions and translate between {@code - * PObject<T>}s or {@code PCollection<T>}s and Ts or DStreams/RDDs of Ts. - */ -public class EvaluationContext { - private SparkSession sparkSession; - private final Pipeline pipeline; - private final Map<PValue, Dataset> datasets = new LinkedHashMap<>(); - private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>(); - private final Set<Dataset> leaves = new LinkedHashSet<>(); - private final Map<PValue, Object> pobjects = new LinkedHashMap<>(); - private AppliedPTransform<?, ?, ?> currentTransform; - private final SparkPCollectionView pviews = new SparkPCollectionView(); - private final Map<PCollection, Long> cacheCandidates = new HashMap<>(); - private final PipelineOptions options; - private final SerializablePipelineOptions serializableOptions; - - public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) { - this.jsc = jsc; - this.pipeline = pipeline; - this.options = options; - this.serializableOptions = new SerializablePipelineOptions(options); - } - - public EvaluationContext( - JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options, JavaStreamingContext jssc) { - this(jsc, pipeline, options); - this.jssc = jssc; - } - - public JavaSparkContext getSparkContext() { - return jsc; - } - - public JavaStreamingContext getStreamingContext() { - return jssc; - } - - public Pipeline getPipeline() { - return pipeline; - } - - public PipelineOptions getOptions() { - return options; - } - - public SerializablePipelineOptions getSerializableOptions() { - return serializableOptions; - } - - public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { - this.currentTransform = transform; - } - - public AppliedPTransform<?, ?, ?> getCurrentTransform() { - return currentTransform; - } - - public <T extends PValue> T getInput(PTransform<T, ?> transform) { - @SuppressWarnings("unchecked") - T input = - (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform())); - return input; - } - - public <T> Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) { - checkArgument(currentTransform != null, "can only be called with non-null currentTransform"); - checkArgument( - currentTransform.getTransform() == transform, "can only be called with current transform"); - return currentTransform.getInputs(); - } - - public <T extends PValue> T getOutput(PTransform<?, T> transform) { - @SuppressWarnings("unchecked") - T output = (T) Iterables.getOnlyElement(getOutputs(transform).values()); - return output; - } - - public Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) { - checkArgument(currentTransform != null, "can only be called with non-null currentTransform"); - checkArgument( - currentTransform.getTransform() == transform, "can only be called with current transform"); - return currentTransform.getOutputs(); - } - - public Map<TupleTag<?>, Coder<?>> getOutputCoders() { - return currentTransform - .getOutputs() - .entrySet() - .stream() - .filter(e -> e.getValue() instanceof PCollection) - .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); - } - - private boolean shouldCache(PValue pvalue) { - if ((pvalue instanceof PCollection) - && cacheCandidates.containsKey(pvalue) - && cacheCandidates.get(pvalue) > 1) { - return true; - } - return false; - } - - public void putDataset( - PTransform<?, ? extends PValue> transform, Dataset dataset, boolean forceCache) { - putDataset(getOutput(transform), dataset, forceCache); - } - - public void putDataset(PTransform<?, ? extends PValue> transform, Dataset dataset) { - putDataset(transform, dataset, false); - } - - public void putDataset(PValue pvalue, Dataset dataset, boolean forceCache) { - try { - dataset.setName(pvalue.getName()); - } catch (IllegalStateException e) { - // name not set, ignore - } - if ((forceCache || shouldCache(pvalue)) && pvalue instanceof PCollection) { - // we cache only PCollection - Coder<?> coder = ((PCollection<?>) pvalue).getCoder(); - Coder<? extends BoundedWindow> wCoder = - ((PCollection<?>) pvalue).getWindowingStrategy().getWindowFn().windowCoder(); - dataset.cache(storageLevel(), WindowedValue.getFullCoder(coder, wCoder)); - } - datasets.put(pvalue, dataset); - leaves.add(dataset); - } - - public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) { - return borrowDataset(getInput(transform)); - } - - public Dataset borrowDataset(PValue pvalue) { - Dataset dataset = datasets.get(pvalue); - leaves.remove(dataset); - return dataset; - } - - /** - * Computes the outputs for all RDDs that are leaves in the DAG and do not have any actions (like - * saving to a file) registered on them (i.e. they are performed for side effects). - */ - public void computeOutputs() { - for (Dataset dataset : leaves) { - dataset.action(); // force computation. - } - } - - /** - * Retrieve an object of Type T associated with the PValue passed in. - * - * @param value PValue to retrieve associated data for. - * @param <T> Type of object to return. - * @return Native object. - */ - @SuppressWarnings("TypeParameterUnusedInFormals") - public <T> T get(PValue value) { - if (pobjects.containsKey(value)) { - T result = (T) pobjects.get(value); - return result; - } - if (pcollections.containsKey(value)) { - JavaRDD<?> rdd = ((BoundedDataset) pcollections.get(value)).getRDD(); - T res = (T) Iterables.getOnlyElement(rdd.collect()); - pobjects.put(value, res); - return res; - } - throw new IllegalStateException("Cannot resolve un-known PObject: " + value); - } - - /** - * Return the current views creates in the pipeline. - * - * @return SparkPCollectionView - */ - public SparkPCollectionView getPViews() { - return pviews; - } - - /** - * Adds/Replaces a view to the current views creates in the pipeline. - * - * @param view - Identifier of the view - * @param value - Actual value of the view - * @param coder - Coder of the value - */ - public void putPView( - PCollectionView<?> view, - Iterable<WindowedValue<?>> value, - Coder<Iterable<WindowedValue<?>>> coder) { - pviews.putPView(view, value, coder); - } - - /** - * Get the map of cache candidates hold by the evaluation context. - * - * @return The current {@link Map} of cache candidates. - */ - public Map<PCollection, Long> getCacheCandidates() { - return this.cacheCandidates; - } - - <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) { - @SuppressWarnings("unchecked") - BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection); - leaves.remove(boundedDataset); - return boundedDataset.getValues(pcollection); - } - - public String storageLevel() { - return serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel(); - } -} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java new file mode 100644 index 0000000..f0ce1e5 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java @@ -0,0 +1,94 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation; + +import org.apache.beam.runners.core.construction.PipelineResources; +import org.apache.beam.runners.spark.SparkTransformOverrides; +import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Does all the translation work: mode detection, nodes translation. + */ + +public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{ + + + /** + * Local configurations work in the same JVM and have no problems with improperly formatted files + * on classpath (eg. directories with .class files or empty directories). Prepare files for + * staging only when using remote cluster (passing the master address explicitly). + */ + public static void prepareFilesToStageForRemoteClusterExecution(SparkPipelineOptions options) { + if (!options.getSparkMaster().matches("local\\[?\\d*\\]?")) { + options.setFilesToStage( + PipelineResources.prepareFilesForStaging( + options.getFilesToStage(), options.getTempLocation())); + } + } + + public static void replaceTransforms(Pipeline pipeline, SparkPipelineOptions options){ + pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(options.isStreaming())); + + } + + + /** Visit the pipeline to determine the translation mode (batch/streaming) and update options accordingly. */ + public static void detectTranslationMode(Pipeline pipeline, SparkPipelineOptions options) { + TranslationModeDetector detector = new TranslationModeDetector(); + pipeline.traverseTopologically(detector); + if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) { + // set streaming mode if it's a streaming pipeline + options.setStreaming(true); + } + } + + + + + + /** The translation mode of the Beam Pipeline. */ + private enum TranslationMode { + + /** Uses the batch mode. */ + BATCH, + + /** Uses the streaming mode. */ + STREAMING + } + + /** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */ + private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults { + private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class); + + private TranslationMode translationMode; + + TranslationModeDetector(TranslationMode defaultMode) { + this.translationMode = defaultMode; + } + + TranslationModeDetector() { + this(TranslationMode.BATCH); + } + + TranslationMode getTranslationMode() { + return translationMode; + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) { + if (translationMode.equals(TranslationMode.BATCH)) { + if (value instanceof PCollection + && ((PCollection) value).isBounded() == PCollection.IsBounded.UNBOUNDED) { + LOG.info( + "Found unbounded PCollection {}. Switching to streaming execution.", value.getName()); + translationMode = TranslationMode.STREAMING; + } + } + } + } + +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java new file mode 100644 index 0000000..897ac01 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkTransformOverrides.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded; +import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.transforms.PTransform; + +/** {@link PTransform} overrides for Flink runner. */ +public class SparkTransformOverrides { + public static List<PTransformOverride> getDefaultOverrides(boolean streaming) { + ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder(); + // TODO: [BEAM-5358] Support @RequiresStableInput on Spark runner + builder.add( + PTransformOverride.of( + PTransformMatchers.requiresStableInputParDoMulti(), + UnsupportedOverrideFactory.withMessage( + "Spark runner currently doesn't support @RequiresStableInput annotation."))); + if (!streaming) { + builder + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDo(), new SplittableParDo.OverrideFactory())) + .add( + PTransformOverride.of( + PTransformMatchers.urnEqualTo(PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN), + new SplittableParDoNaiveBounded.OverrideFactory())); + } + return builder.build(); + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java new file mode 100644 index 0000000..2058b37 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java @@ -0,0 +1,5 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation; + +public class StreamingPipelineTranslator extends PipelineTranslator { + +}
