This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 29e95af  Add SparkStructuredStreamingPipelineOptions and 
SparkCommonPipelineOptions - SparkStructuredStreamingPipelineOptions was added 
to have the new   runner rely only on its specific options.
29e95af is described below

commit 29e95af93fc0812f71ab37c2307b211aa489ce28
Author: Ismaël Mejía <ieme...@gmail.com>
AuthorDate: Wed Apr 24 10:10:54 2019 +0200

    Add SparkStructuredStreamingPipelineOptions and SparkCommonPipelineOptions
    - SparkStructuredStreamingPipelineOptions was added to have the new
      runner rely only on its specific options.
    
    - SparkCommonPipelineOptions is used to share options common to all
      Spark runners.
---
 ...ptions.java => SparkCommonPipelineOptions.java} | 98 +++-------------------
 .../beam/runners/spark/SparkPipelineOptions.java   | 46 +---------
 .../SparkStructuredStreamingPipelineOptions.java   | 27 ++++++
 .../SparkStructuredStreamingRunner.java            | 21 ++---
 .../translation/PipelineTranslator.java            | 10 ++-
 .../translation/TranslationContext.java            |  8 +-
 .../translation/batch/DatasetSourceBatch.java      | 13 ++-
 .../translation/batch/PipelineTranslatorBatch.java |  4 +-
 .../streaming/DatasetSourceStreaming.java          | 10 +--
 .../streaming/PipelineTranslatorStreaming.java     |  4 +-
 .../translation/batch/CombineTest.java             |  3 +-
 .../translation/batch/ComplexSourceTest.java       |  3 +-
 .../translation/batch/FlattenTest.java             |  3 +-
 .../translation/batch/GroupByKeyTest.java          |  3 +-
 .../translation/batch/ParDoTest.java               |  3 +-
 .../translation/batch/SimpleSourceTest.java        |  3 +-
 .../translation/batch/WindowAssignTest.java        |  3 +-
 .../translation/streaming/SimpleSourceTest.java    |  3 +-
 18 files changed, 82 insertions(+), 183 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java
similarity index 51%
copy from 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
copy to 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java
index 6935b54..b70bdea 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.spark;
 
 import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -28,9 +27,9 @@ import org.apache.beam.sdk.options.StreamingOptions;
 
 /**
  * Spark runner {@link PipelineOptions} handles Spark execution-related 
configurations, such as the
- * master address, batch-interval, and other user-related knobs.
+ * master address, and other user-related knobs.
  */
-public interface SparkPipelineOptions
+public interface SparkCommonPipelineOptions
     extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
 
   @Description("The url of the spark master to connect to, (e.g. 
spark://host:port, local[4]).")
@@ -39,38 +38,6 @@ public interface SparkPipelineOptions
 
   void setSparkMaster(String master);
 
-  @Description("Batch interval for Spark streaming in milliseconds.")
-  @Default.Long(500)
-  Long getBatchIntervalMillis();
-
-  void setBatchIntervalMillis(Long batchInterval);
-
-  @Description("Batch default storage level")
-  @Default.String("MEMORY_ONLY")
-  String getStorageLevel();
-
-  void setStorageLevel(String storageLevel);
-
-  @Description("Minimum time to spend on read, for each micro-batch.")
-  @Default.Long(200)
-  Long getMinReadTimeMillis();
-
-  void setMinReadTimeMillis(Long minReadTimeMillis);
-
-  @Description("Max records per micro-batch. For streaming sources only.")
-  @Default.Long(-1)
-  Long getMaxRecordsPerBatch();
-
-  void setMaxRecordsPerBatch(Long maxRecordsPerBatch);
-
-  @Description(
-      "A value between 0-1 to describe the percentage of a micro-batch 
dedicated "
-          + "to reading from UnboundedSource.")
-  @Default.Double(0.1)
-  Double getReadTimePercentage();
-
-  void setReadTimePercentage(Double readTimePercentage);
-
   @Description(
       "A checkpoint directory for streaming resilience, ignored in batch. "
           + "For durability, a reliable filesystem such as HDFS/S3/GS is 
necessary.")
@@ -80,50 +47,6 @@ public interface SparkPipelineOptions
   void setCheckpointDir(String checkpointDir);
 
   /**
-   * Returns the default checkpoint directory of /tmp/${job.name}. For testing 
purposes only.
-   * Production applications should use a reliable filesystem such as 
HDFS/S3/GS.
-   */
-  class TmpCheckpointDirFactory implements DefaultValueFactory<String> {
-    @Override
-    public String create(PipelineOptions options) {
-      return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName();
-    }
-  }
-
-  @Description(
-      "The period to checkpoint (in Millis). If not set, Spark will default "
-          + "to Max(slideDuration, Seconds(10)). This PipelineOptions default 
(-1) will end-up "
-          + "with the described Spark default.")
-  @Default.Long(-1)
-  Long getCheckpointDurationMillis();
-
-  void setCheckpointDurationMillis(Long durationMillis);
-
-  @Description(
-      "If set bundleSize will be used for splitting BoundedSources, otherwise 
default to "
-          + "splitting BoundedSources on Spark defaultParallelism. Most 
effective when used with "
-          + "Spark dynamicAllocation.")
-  @Default.Long(0)
-  Long getBundleSize();
-
-  @Experimental
-  void setBundleSize(Long value);
-
-  @Description("Enable/disable sending aggregator values to Spark's metric 
sinks")
-  @Default.Boolean(true)
-  Boolean getEnableSparkMetricSinks();
-
-  void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks);
-
-  @Description(
-      "If the spark runner will be initialized with a provided Spark Context. "
-          + "The Spark Context should be provided with SparkContextOptions.")
-  @Default.Boolean(false)
-  boolean getUsesProvidedSparkContext();
-
-  void setUsesProvidedSparkContext(boolean value);
-
-  /**
    * List of local files to make available to workers.
    *
    * <p>Jars are placed on the worker's classpath.
@@ -137,11 +60,14 @@ public interface SparkPipelineOptions
 
   void setFilesToStage(List<String> value);
 
-  @Description(
-      "Disable caching of reused PCollections for whole Pipeline."
-          + " It's useful when it's faster to recompute RDD rather than save. 
")
-  @Default.Boolean(false)
-  boolean isCacheDisabled();
-
-  void setCacheDisabled(boolean value);
+  /**
+   * Returns the default checkpoint directory of /tmp/${job.name}. For testing 
purposes only.
+   * Production applications should use a reliable filesystem such as 
HDFS/S3/GS.
+   */
+  class TmpCheckpointDirFactory implements DefaultValueFactory<String> {
+    @Override
+    public String create(PipelineOptions options) {
+      return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName();
+    }
+  }
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 6935b54..a89c0dd 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -17,27 +17,16 @@
  */
 package org.apache.beam.runners.spark;
 
-import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
 
 /**
  * Spark runner {@link PipelineOptions} handles Spark execution-related 
configurations, such as the
  * master address, batch-interval, and other user-related knobs.
  */
-public interface SparkPipelineOptions
-    extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
-
-  @Description("The url of the spark master to connect to, (e.g. 
spark://host:port, local[4]).")
-  @Default.String("local[4]")
-  String getSparkMaster();
-
-  void setSparkMaster(String master);
+public interface SparkPipelineOptions extends SparkCommonPipelineOptions {
 
   @Description("Batch interval for Spark streaming in milliseconds.")
   @Default.Long(500)
@@ -72,25 +61,6 @@ public interface SparkPipelineOptions
   void setReadTimePercentage(Double readTimePercentage);
 
   @Description(
-      "A checkpoint directory for streaming resilience, ignored in batch. "
-          + "For durability, a reliable filesystem such as HDFS/S3/GS is 
necessary.")
-  @Default.InstanceFactory(TmpCheckpointDirFactory.class)
-  String getCheckpointDir();
-
-  void setCheckpointDir(String checkpointDir);
-
-  /**
-   * Returns the default checkpoint directory of /tmp/${job.name}. For testing 
purposes only.
-   * Production applications should use a reliable filesystem such as 
HDFS/S3/GS.
-   */
-  class TmpCheckpointDirFactory implements DefaultValueFactory<String> {
-    @Override
-    public String create(PipelineOptions options) {
-      return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName();
-    }
-  }
-
-  @Description(
       "The period to checkpoint (in Millis). If not set, Spark will default "
           + "to Max(slideDuration, Seconds(10)). This PipelineOptions default 
(-1) will end-up "
           + "with the described Spark default.")
@@ -123,20 +93,6 @@ public interface SparkPipelineOptions
 
   void setUsesProvidedSparkContext(boolean value);
 
-  /**
-   * List of local files to make available to workers.
-   *
-   * <p>Jars are placed on the worker's classpath.
-   *
-   * <p>The default value is the list of jars from the main program's 
classpath.
-   */
-  @Description(
-      "Jar-Files to send to all workers and put on the classpath. "
-          + "The default value is all files from the classpath.")
-  List<String> getFilesToStage();
-
-  void setFilesToStage(List<String> value);
-
   @Description(
       "Disable caching of reused PCollections for whole Pipeline."
           + " It's useful when it's faster to recompute RDD rather than save. 
")
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
new file mode 100644
index 0000000..b115d9b
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Spark runner {@link PipelineOptions} handles Spark execution-related 
configurations, such as the
+ * master address, and other user-related knobs.
+ */
+public interface SparkStructuredStreamingPipelineOptions extends 
SparkCommonPipelineOptions {}
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index 80940ad..acb5615 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.spark.structuredstreaming;
 
 import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
@@ -44,8 +43,8 @@ import org.slf4j.LoggerFactory;
  * <p>To create a pipeline runner to run against a different spark cluster, 
with a custom master url
  * we would do the following:
  *
- * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions 
options =
- * SparkPipelineOptionsFactory.create(); 
options.setSparkMaster("spark://host:port");
+ * <p>{@code Pipeline p = [logic for pipeline creation] 
SparkStructuredStreamingPipelineOptions
+ * options = SparkPipelineOptionsFactory.create(); 
options.setSparkMaster("spark://host:port");
  * SparkPipelineResult result = (SparkPipelineResult) p.run(); }
  */
 public final class SparkStructuredStreamingRunner extends 
PipelineRunner<SparkPipelineResult> {
@@ -53,7 +52,7 @@ public final class SparkStructuredStreamingRunner extends 
PipelineRunner<SparkPi
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkStructuredStreamingRunner.class);
 
   /** Options used in this pipeline runner. */
-  private final SparkPipelineOptions options;
+  private final SparkStructuredStreamingPipelineOptions options;
 
   /**
    * Creates and returns a new SparkStructuredStreamingRunner with default 
options. In particular,
@@ -62,7 +61,8 @@ public final class SparkStructuredStreamingRunner extends 
PipelineRunner<SparkPi
    * @return A pipeline runner with default options.
    */
   public static SparkStructuredStreamingRunner create() {
-    SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    SparkStructuredStreamingPipelineOptions options =
+        
PipelineOptionsFactory.as(SparkStructuredStreamingPipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     return new SparkStructuredStreamingRunner(options);
   }
@@ -70,10 +70,11 @@ public final class SparkStructuredStreamingRunner extends 
PipelineRunner<SparkPi
   /**
    * Creates and returns a new SparkStructuredStreamingRunner with specified 
options.
    *
-   * @param options The SparkPipelineOptions to use when executing the job.
+   * @param options The SparkStructuredStreamingPipelineOptions to use when 
executing the job.
    * @return A pipeline runner that will execute with specified options.
    */
-  public static SparkStructuredStreamingRunner create(SparkPipelineOptions 
options) {
+  public static SparkStructuredStreamingRunner create(
+      SparkStructuredStreamingPipelineOptions options) {
     return new SparkStructuredStreamingRunner(options);
   }
 
@@ -84,8 +85,8 @@ public final class SparkStructuredStreamingRunner extends 
PipelineRunner<SparkPi
    * @return A pipeline runner that will execute with specified options.
    */
   public static SparkStructuredStreamingRunner fromOptions(PipelineOptions 
options) {
-    SparkPipelineOptions sparkOptions =
-        PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+    SparkStructuredStreamingPipelineOptions sparkOptions =
+        
PipelineOptionsValidator.validate(SparkStructuredStreamingPipelineOptions.class,
 options);
 
     if (sparkOptions.getFilesToStage() == null) {
       sparkOptions.setFilesToStage(
@@ -105,7 +106,7 @@ public final class SparkStructuredStreamingRunner extends 
PipelineRunner<SparkPi
    * No parameter constructor defaults to running this pipeline in Spark's 
local mode, in a single
    * thread.
    */
-  private SparkStructuredStreamingRunner(SparkPipelineOptions options) {
+  private 
SparkStructuredStreamingRunner(SparkStructuredStreamingPipelineOptions options) 
{
     this.options = options;
   }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index 8a7afb9..b623e55 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -19,10 +19,11 @@ package 
org.apache.beam.runners.spark.structuredstreaming.translation;
 
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineResources;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
+import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -50,7 +51,8 @@ public abstract class PipelineTranslator extends 
Pipeline.PipelineVisitor.Defaul
    * on classpath (eg. directories with .class files or empty directories). 
Prepare files for
    * staging only when using remote cluster (passing the master address 
explicitly).
    */
-  public static void 
prepareFilesToStageForRemoteClusterExecution(SparkPipelineOptions options) {
+  public static void prepareFilesToStageForRemoteClusterExecution(
+      SparkStructuredStreamingPipelineOptions options) {
     if (!options.getSparkMaster().matches("local\\[?\\d*]?")) {
       options.setFilesToStage(
           PipelineResources.prepareFilesForStaging(
@@ -58,7 +60,7 @@ public abstract class PipelineTranslator extends 
Pipeline.PipelineVisitor.Defaul
     }
   }
 
-  public static void replaceTransforms(Pipeline pipeline, SparkPipelineOptions 
options) {
+  public static void replaceTransforms(Pipeline pipeline, StreamingOptions 
options) {
     
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(options.isStreaming()));
   }
 
@@ -66,7 +68,7 @@ public abstract class PipelineTranslator extends 
Pipeline.PipelineVisitor.Defaul
    * Visit the pipeline to determine the translation mode (batch/streaming) 
and update options
    * accordingly.
    */
-  public static void detectTranslationMode(Pipeline pipeline, 
SparkPipelineOptions options) {
+  public static void detectTranslationMode(Pipeline pipeline, StreamingOptions 
options) {
     TranslationModeDetector detector = new TranslationModeDetector();
     pipeline.traverseTopologically(detector);
     if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index ad4724f..4d17120 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -26,7 +26,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TransformInputs;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
+import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -69,7 +69,7 @@ public class TranslationContext {
 
   private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets;
 
-  public TranslationContext(SparkPipelineOptions options) {
+  public TranslationContext(SparkStructuredStreamingPipelineOptions options) {
     SparkConf sparkConf = new SparkConf();
     sparkConf.setMaster(options.getSparkMaster());
     sparkConf.setAppName(options.getAppName());
@@ -182,8 +182,8 @@ public class TranslationContext {
   /** Starts a new pipeline. */
   public void startPipeline(boolean testMode) {
     try {
-      SparkPipelineOptions options =
-          serializablePipelineOptions.get().as(SparkPipelineOptions.class);
+      SparkStructuredStreamingPipelineOptions options =
+          
serializablePipelineOptions.get().as(SparkStructuredStreamingPipelineOptions.class);
       for (Dataset<?> dataset : leaves) {
         if (options.isStreaming()) {
           // TODO: deal with Beam Discarding, Accumulating and Accumulating & 
Retracting       outputmodes
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index e2cff1f..22f3e35 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -25,11 +25,11 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
@@ -93,14 +93,12 @@ public class DatasetSourceBatch implements DataSourceV2, 
ReadSupport {
 
     @Override
     public List<InputPartition<InternalRow>> planInputPartitions() {
-      SparkPipelineOptions sparkPipelineOptions =
-          serializablePipelineOptions.get().as(SparkPipelineOptions.class);
+      PipelineOptions options = serializablePipelineOptions.get();
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
       try {
-        desiredSizeBytes = source.getEstimatedSizeBytes(sparkPipelineOptions) 
/ numPartitions;
-        List<? extends BoundedSource<T>> splits =
-            source.split(desiredSizeBytes, sparkPipelineOptions);
+        desiredSizeBytes = source.getEstimatedSizeBytes(options) / 
numPartitions;
+        List<? extends BoundedSource<T>> splits = 
source.split(desiredSizeBytes, options);
         for (BoundedSource<T> split : splits) {
           result.add(
               (InputPartition<InternalRow>)
@@ -129,8 +127,7 @@ public class DatasetSourceBatch implements DataSourceV2, 
ReadSupport {
       this.source = source;
       // reader is not serializable so lazy initialize it
       try {
-        reader =
-            
source.createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
+        reader = 
source.createReader(serializablePipelineOptions.get().as(PipelineOptions.class));
       } catch (IOException e) {
         throw new RuntimeException("Error creating BoundedReader ", e);
       }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 0ab7a49..7f7d962 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -20,7 +20,7 @@ package 
org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
+import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
@@ -78,7 +78,7 @@ public class PipelineTranslatorBatch extends 
PipelineTranslator {
         View.CreatePCollectionView.class, new 
CreatePCollectionViewTranslatorBatch());
   }
 
-  public PipelineTranslatorBatch(SparkPipelineOptions options) {
+  public PipelineTranslatorBatch(SparkStructuredStreamingPipelineOptions 
options) {
     translationContext = new TranslationContext(options);
   }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index 9270b9a..c4371cd 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -29,11 +29,11 @@ import java.util.List;
 import java.util.Optional;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
@@ -159,12 +159,11 @@ class DatasetSourceStreaming implements DataSourceV2, 
MicroBatchReadSupport {
 
     @Override
     public List<InputPartition<InternalRow>> planInputPartitions() {
-      SparkPipelineOptions sparkPipelineOptions =
-          serializablePipelineOptions.get().as(SparkPipelineOptions.class);
+      PipelineOptions options = serializablePipelineOptions.get();
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       try {
         List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
-            source.split(numPartitions, sparkPipelineOptions);
+            source.split(numPartitions, options);
         for (UnboundedSource<T, CheckpointMarkT> split : splits) {
           result.add(
               (InputPartition<InternalRow>)
@@ -236,8 +235,7 @@ class DatasetSourceStreaming implements DataSourceV2, 
MicroBatchReadSupport {
             // 
https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-spark-structured-streaming/
             // "Structured Streaming stores and retrieves the offsets on our 
behalf when re-running
             // the application meaning we no longer have to store them 
externally."
-            source.createReader(
-                
serializablePipelineOptions.get().as(SparkPipelineOptions.class), null);
+            source.createReader(serializablePipelineOptions.get(), null);
       } catch (IOException e) {
         throw new RuntimeException("Error creating UnboundedReader ", e);
       }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
index 3f47482..890bfb5 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
@@ -20,7 +20,7 @@ package 
org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
+import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
@@ -71,7 +71,7 @@ public class PipelineTranslatorStreaming extends 
PipelineTranslator {
     //        .put(View.CreatePCollectionView.class, new 
CreatePCollectionViewTranslatorBatch());
   }
 
-  public PipelineTranslatorStreaming(SparkPipelineOptions options) {
+  public PipelineTranslatorStreaming(SparkStructuredStreamingPipelineOptions 
options) {
     translationContext = new TranslationContext(options);
   }
 
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
index fbceb11..bd5df66 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -41,7 +40,7 @@ public class CombineTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
   }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
index ed333ce..8e2e224 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
@@ -25,7 +25,6 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -48,7 +47,7 @@ public class ComplexSourceTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() throws IOException {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
     file = TEMPORARY_FOLDER.newFile();
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
index 799178d..0b1ac96 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -39,7 +38,7 @@ public class FlattenTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
   }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
index eaa7a44..7a64492 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,7 +41,7 @@ public class GroupByKeyTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
   }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
index 6c5fe74..1388572 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -44,7 +43,7 @@ public class ParDoTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
   }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
index aa4eca4..5fd1b77 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import 
org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
 import org.apache.beam.sdk.Pipeline;
@@ -50,7 +49,7 @@ public class SimpleSourceTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
   }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
index 05ea2e7..7c4f963 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -40,7 +39,7 @@ public class WindowAssignTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
   }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java
index b590b90..8bee646 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java
@@ -18,7 +18,6 @@
 package 
org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -40,7 +39,7 @@ public class SimpleSourceTest implements Serializable {
 
   @BeforeClass
   public static void beforeClass() {
-    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(PipelineOptions.class);
     options.setRunner(SparkStructuredStreamingRunner.class);
     pipeline = Pipeline.create(options);
   }

Reply via email to