This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 20d5bbd4e8d2d7d7b4dc9639d716b1e3403f91eb Author: Alexey Romanenko <[email protected]> AuthorDate: Fri Jul 19 15:48:32 2019 +0200 Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply --- .../translation/TranslationContext.java | 15 +++++++++++++++ .../translation/batch/ParDoTranslatorBatch.java | 8 +++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index f1bafd33..75f3ddf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -78,6 +78,21 @@ public class TranslationContext { sparkConf.setJars(options.getFilesToStage().toArray(new String[0])); } + // By default, Spark defines 200 as a number of sql partitions. This seems too much for local + // mode, so try to align with value of "sparkMaster" option in this case. + // We should not overwrite this value (or any user-defined spark configuration value) if the + // user has already configured it. + String sparkMaster = options.getSparkMaster(); + if (sparkMaster != null + && sparkMaster.startsWith("local[") + && System.getProperty("spark.sql.shuffle.partitions") == null) { + int numPartitions = + Integer.parseInt(sparkMaster.substring("local[".length(), sparkMaster.length() - 1)); + if (numPartitions > 0) { + sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions)); + } + } + this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); this.serializablePipelineOptions = new SerializablePipelineOptions(options); this.datasets = new HashMap<>(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 742c1b0..255adc8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -137,9 +137,11 @@ class ParDoTranslatorBatch<InputT, OutputT> pruneOutputFilteredByTag(context, allOutputs, output); } } else { - Dataset<WindowedValue<?>> outputDataset = allOutputs.map( - (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + Dataset<WindowedValue<?>> outputDataset = + allOutputs.map( + (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>) + value -> value._2, + EncoderHelpers.windowedValueEncoder()); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
