This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 20d5bbd4e8d2d7d7b4dc9639d716b1e3403f91eb
Author: Alexey Romanenko <[email protected]>
AuthorDate: Fri Jul 19 15:48:32 2019 +0200

    Use "sparkMaster" in local mode to obtain number of shuffle partitions
    + spotless apply
---
 .../translation/TranslationContext.java                   | 15 +++++++++++++++
 .../translation/batch/ParDoTranslatorBatch.java           |  8 +++++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index f1bafd33..75f3ddf 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -78,6 +78,21 @@ public class TranslationContext {
       sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
     }
 
+    // By default, Spark defines 200 as a number of sql partitions. This seems 
too much for local
+    // mode, so try to align with value of "sparkMaster" option in this case.
+    // We should not overwrite this value (or any user-defined spark 
configuration value) if the
+    // user has already configured it.
+    String sparkMaster = options.getSparkMaster();
+    if (sparkMaster != null
+        && sparkMaster.startsWith("local[")
+        && System.getProperty("spark.sql.shuffle.partitions") == null) {
+      int numPartitions =
+          Integer.parseInt(sparkMaster.substring("local[".length(), 
sparkMaster.length() - 1));
+      if (numPartitions > 0) {
+        sparkConf.set("spark.sql.shuffle.partitions", 
String.valueOf(numPartitions));
+      }
+    }
+
     this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
     this.serializablePipelineOptions = new 
SerializablePipelineOptions(options);
     this.datasets = new HashMap<>();
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 742c1b0..255adc8 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -137,9 +137,11 @@ class ParDoTranslatorBatch<InputT, OutputT>
         pruneOutputFilteredByTag(context, allOutputs, output);
       }
     } else {
-      Dataset<WindowedValue<?>> outputDataset = allOutputs.map(
-          (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, 
WindowedValue<?>>) value -> value._2,
-          EncoderHelpers.windowedValueEncoder());
+      Dataset<WindowedValue<?>> outputDataset =
+          allOutputs.map(
+              (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, 
WindowedValue<?>>)
+                  value -> value._2,
+              EncoderHelpers.windowedValueEncoder());
       
context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), 
outputDataset);
     }
   }

Reply via email to