This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 29e95af Add SparkStructuredStreamingPipelineOptions and SparkCommonPipelineOptions - SparkStructuredStreamingPipelineOptions was added to have the new runner rely only on its specific options. 29e95af is described below commit 29e95af93fc0812f71ab37c2307b211aa489ce28 Author: Ismaël Mejía <ieme...@gmail.com> AuthorDate: Wed Apr 24 10:10:54 2019 +0200 Add SparkStructuredStreamingPipelineOptions and SparkCommonPipelineOptions - SparkStructuredStreamingPipelineOptions was added to have the new runner rely only on its specific options. - SparkCommonPipelineOptions is used to share options common to all Spark runners. --- ...ptions.java => SparkCommonPipelineOptions.java} | 98 +++------------------- .../beam/runners/spark/SparkPipelineOptions.java | 46 +--------- .../SparkStructuredStreamingPipelineOptions.java | 27 ++++++ .../SparkStructuredStreamingRunner.java | 21 ++--- .../translation/PipelineTranslator.java | 10 ++- .../translation/TranslationContext.java | 8 +- .../translation/batch/DatasetSourceBatch.java | 13 ++- .../translation/batch/PipelineTranslatorBatch.java | 4 +- .../streaming/DatasetSourceStreaming.java | 10 +-- .../streaming/PipelineTranslatorStreaming.java | 4 +- .../translation/batch/CombineTest.java | 3 +- .../translation/batch/ComplexSourceTest.java | 3 +- .../translation/batch/FlattenTest.java | 3 +- .../translation/batch/GroupByKeyTest.java | 3 +- .../translation/batch/ParDoTest.java | 3 +- .../translation/batch/SimpleSourceTest.java | 3 +- .../translation/batch/WindowAssignTest.java | 3 +- .../translation/streaming/SimpleSourceTest.java | 3 +- 18 files changed, 82 insertions(+), 183 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java similarity index 51% copy from runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java copy to runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java index 6935b54..b70bdea 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark; import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -28,9 +27,9 @@ import org.apache.beam.sdk.options.StreamingOptions; /** * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the - * master address, batch-interval, and other user-related knobs. + * master address, and other user-related knobs. */ -public interface SparkPipelineOptions +public interface SparkCommonPipelineOptions extends PipelineOptions, StreamingOptions, ApplicationNameOptions { @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).") @@ -39,38 +38,6 @@ public interface SparkPipelineOptions void setSparkMaster(String master); - @Description("Batch interval for Spark streaming in milliseconds.") - @Default.Long(500) - Long getBatchIntervalMillis(); - - void setBatchIntervalMillis(Long batchInterval); - - @Description("Batch default storage level") - @Default.String("MEMORY_ONLY") - String getStorageLevel(); - - void setStorageLevel(String storageLevel); - - @Description("Minimum time to spend on read, for each micro-batch.") - @Default.Long(200) - Long getMinReadTimeMillis(); - - void setMinReadTimeMillis(Long minReadTimeMillis); - - @Description("Max records per micro-batch. For streaming sources only.") - @Default.Long(-1) - Long getMaxRecordsPerBatch(); - - void setMaxRecordsPerBatch(Long maxRecordsPerBatch); - - @Description( - "A value between 0-1 to describe the percentage of a micro-batch dedicated " - + "to reading from UnboundedSource.") - @Default.Double(0.1) - Double getReadTimePercentage(); - - void setReadTimePercentage(Double readTimePercentage); - @Description( "A checkpoint directory for streaming resilience, ignored in batch. " + "For durability, a reliable filesystem such as HDFS/S3/GS is necessary.") @@ -80,50 +47,6 @@ public interface SparkPipelineOptions void setCheckpointDir(String checkpointDir); /** - * Returns the default checkpoint directory of /tmp/${job.name}. For testing purposes only. - * Production applications should use a reliable filesystem such as HDFS/S3/GS. - */ - class TmpCheckpointDirFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName(); - } - } - - @Description( - "The period to checkpoint (in Millis). If not set, Spark will default " - + "to Max(slideDuration, Seconds(10)). This PipelineOptions default (-1) will end-up " - + "with the described Spark default.") - @Default.Long(-1) - Long getCheckpointDurationMillis(); - - void setCheckpointDurationMillis(Long durationMillis); - - @Description( - "If set bundleSize will be used for splitting BoundedSources, otherwise default to " - + "splitting BoundedSources on Spark defaultParallelism. Most effective when used with " - + "Spark dynamicAllocation.") - @Default.Long(0) - Long getBundleSize(); - - @Experimental - void setBundleSize(Long value); - - @Description("Enable/disable sending aggregator values to Spark's metric sinks") - @Default.Boolean(true) - Boolean getEnableSparkMetricSinks(); - - void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks); - - @Description( - "If the spark runner will be initialized with a provided Spark Context. " - + "The Spark Context should be provided with SparkContextOptions.") - @Default.Boolean(false) - boolean getUsesProvidedSparkContext(); - - void setUsesProvidedSparkContext(boolean value); - - /** * List of local files to make available to workers. * * <p>Jars are placed on the worker's classpath. @@ -137,11 +60,14 @@ public interface SparkPipelineOptions void setFilesToStage(List<String> value); - @Description( - "Disable caching of reused PCollections for whole Pipeline." - + " It's useful when it's faster to recompute RDD rather than save. ") - @Default.Boolean(false) - boolean isCacheDisabled(); - - void setCacheDisabled(boolean value); + /** + * Returns the default checkpoint directory of /tmp/${job.name}. For testing purposes only. + * Production applications should use a reliable filesystem such as HDFS/S3/GS. + */ + class TmpCheckpointDirFactory implements DefaultValueFactory<String> { + @Override + public String create(PipelineOptions options) { + return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName(); + } + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 6935b54..a89c0dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -17,27 +17,16 @@ */ package org.apache.beam.runners.spark; -import java.util.List; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; /** * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the * master address, batch-interval, and other user-related knobs. */ -public interface SparkPipelineOptions - extends PipelineOptions, StreamingOptions, ApplicationNameOptions { - - @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).") - @Default.String("local[4]") - String getSparkMaster(); - - void setSparkMaster(String master); +public interface SparkPipelineOptions extends SparkCommonPipelineOptions { @Description("Batch interval for Spark streaming in milliseconds.") @Default.Long(500) @@ -72,25 +61,6 @@ public interface SparkPipelineOptions void setReadTimePercentage(Double readTimePercentage); @Description( - "A checkpoint directory for streaming resilience, ignored in batch. " - + "For durability, a reliable filesystem such as HDFS/S3/GS is necessary.") - @Default.InstanceFactory(TmpCheckpointDirFactory.class) - String getCheckpointDir(); - - void setCheckpointDir(String checkpointDir); - - /** - * Returns the default checkpoint directory of /tmp/${job.name}. For testing purposes only. - * Production applications should use a reliable filesystem such as HDFS/S3/GS. - */ - class TmpCheckpointDirFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName(); - } - } - - @Description( "The period to checkpoint (in Millis). If not set, Spark will default " + "to Max(slideDuration, Seconds(10)). This PipelineOptions default (-1) will end-up " + "with the described Spark default.") @@ -123,20 +93,6 @@ public interface SparkPipelineOptions void setUsesProvidedSparkContext(boolean value); - /** - * List of local files to make available to workers. - * - * <p>Jars are placed on the worker's classpath. - * - * <p>The default value is the list of jars from the main program's classpath. - */ - @Description( - "Jar-Files to send to all workers and put on the classpath. " - + "The default value is all files from the classpath.") - List<String> getFilesToStage(); - - void setFilesToStage(List<String> value); - @Description( "Disable caching of reused PCollections for whole Pipeline." + " It's useful when it's faster to recompute RDD rather than save. ") diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java new file mode 100644 index 0000000..b115d9b --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming; + +import org.apache.beam.runners.spark.SparkCommonPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the + * master address, and other user-related knobs. + */ +public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index 80940ad..acb5615 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.spark.structuredstreaming; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch; @@ -44,8 +43,8 @@ import org.slf4j.LoggerFactory; * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url * we would do the following: * - * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options = - * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port"); + * <p>{@code Pipeline p = [logic for pipeline creation] SparkStructuredStreamingPipelineOptions + * options = SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port"); * SparkPipelineResult result = (SparkPipelineResult) p.run(); } */ public final class SparkStructuredStreamingRunner extends PipelineRunner<SparkPipelineResult> { @@ -53,7 +52,7 @@ public final class SparkStructuredStreamingRunner extends PipelineRunner<SparkPi private static final Logger LOG = LoggerFactory.getLogger(SparkStructuredStreamingRunner.class); /** Options used in this pipeline runner. */ - private final SparkPipelineOptions options; + private final SparkStructuredStreamingPipelineOptions options; /** * Creates and returns a new SparkStructuredStreamingRunner with default options. In particular, @@ -62,7 +61,8 @@ public final class SparkStructuredStreamingRunner extends PipelineRunner<SparkPi * @return A pipeline runner with default options. */ public static SparkStructuredStreamingRunner create() { - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + SparkStructuredStreamingPipelineOptions options = + PipelineOptionsFactory.as(SparkStructuredStreamingPipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); return new SparkStructuredStreamingRunner(options); } @@ -70,10 +70,11 @@ public final class SparkStructuredStreamingRunner extends PipelineRunner<SparkPi /** * Creates and returns a new SparkStructuredStreamingRunner with specified options. * - * @param options The SparkPipelineOptions to use when executing the job. + * @param options The SparkStructuredStreamingPipelineOptions to use when executing the job. * @return A pipeline runner that will execute with specified options. */ - public static SparkStructuredStreamingRunner create(SparkPipelineOptions options) { + public static SparkStructuredStreamingRunner create( + SparkStructuredStreamingPipelineOptions options) { return new SparkStructuredStreamingRunner(options); } @@ -84,8 +85,8 @@ public final class SparkStructuredStreamingRunner extends PipelineRunner<SparkPi * @return A pipeline runner that will execute with specified options. */ public static SparkStructuredStreamingRunner fromOptions(PipelineOptions options) { - SparkPipelineOptions sparkOptions = - PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); + SparkStructuredStreamingPipelineOptions sparkOptions = + PipelineOptionsValidator.validate(SparkStructuredStreamingPipelineOptions.class, options); if (sparkOptions.getFilesToStage() == null) { sparkOptions.setFilesToStage( @@ -105,7 +106,7 @@ public final class SparkStructuredStreamingRunner extends PipelineRunner<SparkPi * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single * thread. */ - private SparkStructuredStreamingRunner(SparkPipelineOptions options) { + private SparkStructuredStreamingRunner(SparkStructuredStreamingPipelineOptions options) { this.options = options; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java index 8a7afb9..b623e55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java @@ -19,10 +19,11 @@ package org.apache.beam.runners.spark.structuredstreaming.translation; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PipelineResources; -import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch; import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -50,7 +51,8 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul * on classpath (eg. directories with .class files or empty directories). Prepare files for * staging only when using remote cluster (passing the master address explicitly). */ - public static void prepareFilesToStageForRemoteClusterExecution(SparkPipelineOptions options) { + public static void prepareFilesToStageForRemoteClusterExecution( + SparkStructuredStreamingPipelineOptions options) { if (!options.getSparkMaster().matches("local\\[?\\d*]?")) { options.setFilesToStage( PipelineResources.prepareFilesForStaging( @@ -58,7 +60,7 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul } } - public static void replaceTransforms(Pipeline pipeline, SparkPipelineOptions options) { + public static void replaceTransforms(Pipeline pipeline, StreamingOptions options) { pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(options.isStreaming())); } @@ -66,7 +68,7 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul * Visit the pipeline to determine the translation mode (batch/streaming) and update options * accordingly. */ - public static void detectTranslationMode(Pipeline pipeline, SparkPipelineOptions options) { + public static void detectTranslationMode(Pipeline pipeline, StreamingOptions options) { TranslationModeDetector detector = new TranslationModeDetector(); pipeline.traverseTopologically(detector); if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index ad4724f..4d17120 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -26,7 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TransformInputs; -import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -69,7 +69,7 @@ public class TranslationContext { private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets; - public TranslationContext(SparkPipelineOptions options) { + public TranslationContext(SparkStructuredStreamingPipelineOptions options) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(options.getSparkMaster()); sparkConf.setAppName(options.getAppName()); @@ -182,8 +182,8 @@ public class TranslationContext { /** Starts a new pipeline. */ public void startPipeline(boolean testMode) { try { - SparkPipelineOptions options = - serializablePipelineOptions.get().as(SparkPipelineOptions.class); + SparkStructuredStreamingPipelineOptions options = + serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class); for (Dataset<?> dataset : leaves) { if (options.isStreaming()) { // TODO: deal with Beam Discarding, Accumulating and Accumulating & Retracting outputmodes diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index e2cff1f..22f3e35 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -25,11 +25,11 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.serialization.Base64Serializer; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.ContinuousReadSupport; @@ -93,14 +93,12 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { @Override public List<InputPartition<InternalRow>> planInputPartitions() { - SparkPipelineOptions sparkPipelineOptions = - serializablePipelineOptions.get().as(SparkPipelineOptions.class); + PipelineOptions options = serializablePipelineOptions.get(); List<InputPartition<InternalRow>> result = new ArrayList<>(); long desiredSizeBytes; try { - desiredSizeBytes = source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions; - List<? extends BoundedSource<T>> splits = - source.split(desiredSizeBytes, sparkPipelineOptions); + desiredSizeBytes = source.getEstimatedSizeBytes(options) / numPartitions; + List<? extends BoundedSource<T>> splits = source.split(desiredSizeBytes, options); for (BoundedSource<T> split : splits) { result.add( (InputPartition<InternalRow>) @@ -129,8 +127,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { this.source = source; // reader is not serializable so lazy initialize it try { - reader = - source.createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class)); + reader = source.createReader(serializablePipelineOptions.get().as(PipelineOptions.class)); } catch (IOException e) { throw new RuntimeException("Error creating BoundedReader ", e); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java index 0ab7a49..7f7d962 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; @@ -78,7 +78,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator { View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); } - public PipelineTranslatorBatch(SparkPipelineOptions options) { + public PipelineTranslatorBatch(SparkStructuredStreamingPipelineOptions options) { translationContext = new TranslationContext(options); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java index 9270b9a..c4371cd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java @@ -29,11 +29,11 @@ import java.util.List; import java.util.Optional; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.serialization.Base64Serializer; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.ContinuousReadSupport; @@ -159,12 +159,11 @@ class DatasetSourceStreaming implements DataSourceV2, MicroBatchReadSupport { @Override public List<InputPartition<InternalRow>> planInputPartitions() { - SparkPipelineOptions sparkPipelineOptions = - serializablePipelineOptions.get().as(SparkPipelineOptions.class); + PipelineOptions options = serializablePipelineOptions.get(); List<InputPartition<InternalRow>> result = new ArrayList<>(); try { List<? extends UnboundedSource<T, CheckpointMarkT>> splits = - source.split(numPartitions, sparkPipelineOptions); + source.split(numPartitions, options); for (UnboundedSource<T, CheckpointMarkT> split : splits) { result.add( (InputPartition<InternalRow>) @@ -236,8 +235,7 @@ class DatasetSourceStreaming implements DataSourceV2, MicroBatchReadSupport { // https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-spark-structured-streaming/ // "Structured Streaming stores and retrieves the offsets on our behalf when re-running // the application meaning we no longer have to store them externally." - source.createReader( - serializablePipelineOptions.get().as(SparkPipelineOptions.class), null); + source.createReader(serializablePipelineOptions.get(), null); } catch (IOException e) { throw new RuntimeException("Error creating UnboundedReader ", e); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java index 3f47482..890bfb5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.streaming; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; @@ -71,7 +71,7 @@ public class PipelineTranslatorStreaming extends PipelineTranslator { // .put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); } - public PipelineTranslatorStreaming(SparkPipelineOptions options) { + public PipelineTranslatorStreaming(SparkStructuredStreamingPipelineOptions options) { translationContext = new TranslationContext(options); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java index fbceb11..bd5df66 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -41,7 +40,7 @@ public class CombineTest implements Serializable { @BeforeClass public static void beforeClass() { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java index ed333ce..8e2e224 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java @@ -25,7 +25,6 @@ import java.io.PrintStream; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; @@ -48,7 +47,7 @@ public class ComplexSourceTest implements Serializable { @BeforeClass public static void beforeClass() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); file = TEMPORARY_FOLDER.newFile(); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java index 799178d..0b1ac96 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -39,7 +38,7 @@ public class FlattenTest implements Serializable { @BeforeClass public static void beforeClass() { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java index eaa7a44..7a64492 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -42,7 +41,7 @@ public class GroupByKeyTest implements Serializable { @BeforeClass public static void beforeClass() { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java index 6c5fe74..1388572 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; import java.util.List; import java.util.Map; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -44,7 +43,7 @@ public class ParDoTest implements Serializable { @BeforeClass public static void beforeClass() { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java index aa4eca4..5fd1b77 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.serialization.Base64Serializer; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger; import org.apache.beam.sdk.Pipeline; @@ -50,7 +49,7 @@ public class SimpleSourceTest implements Serializable { @BeforeClass public static void beforeClass() { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java index 05ea2e7..7c4f963 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -40,7 +39,7 @@ public class WindowAssignTest implements Serializable { @BeforeClass public static void beforeClass() { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java index b590b90..8bee646 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.streaming; import java.io.Serializable; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; @@ -40,7 +39,7 @@ public class SimpleSourceTest implements Serializable { @BeforeClass public static void beforeClass() { - PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); pipeline = Pipeline.create(options); }