This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 06f9ff8 [BEAM-4783] Fix issues created in #6181. new d3709b0 Merge pull request #7690: [BEAM-4783] Fix issues created in #6181 06f9ff8 is described below commit 06f9ff8c7264dfc890788acd03dd762837d0ddce Author: Kyle Winkelman <kyle.winkel...@optum.com> AuthorDate: Thu Jan 31 10:52:12 2019 -0600 [BEAM-4783] Fix issues created in #6181. --- .../spark/translation/TransformTranslator.java | 21 +++++++++++++-------- .../streaming/StreamingTransformTranslator.java | 7 ++++++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index a555e6e..c6fc434 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -133,14 +133,8 @@ public final class TransformTranslator { WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder()); // --- group by key only. - Long bundleSize = - context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize(); - Partitioner partitioner = - (bundleSize > 0) - ? new HashPartitioner(context.getSparkContext().defaultParallelism()) - : null; JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey = - GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, partitioner); + GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, getPartitioner(context)); // --- now group also by window. // for batch, GroupAlsoByWindow uses an in-memory StateInternals. @@ -377,6 +371,7 @@ public final class TransformTranslator { (KvCoder) context.getInput(transform).getCoder(), windowingStrategy.getWindowFn().windowCoder(), (JavaRDD) inRDD, + getPartitioner(context), (MultiDoFnFunction) multiDoFnFunction); } else { all = inRDD.mapPartitionsToPair(multiDoFnFunction); @@ -420,6 +415,7 @@ public final class TransformTranslator { KvCoder<K, V> kvCoder, Coder<? extends BoundedWindow> windowCoder, JavaRDD<WindowedValue<KV<K, V>>> kvInRDD, + Partitioner partitioner, MultiDoFnFunction<KV<K, V>, OutputT> doFnFunction) { Coder<K> keyCoder = kvCoder.getKeyCoder(); @@ -427,7 +423,7 @@ public final class TransformTranslator { WindowedValue.FullWindowedValueCoder.of(kvCoder.getValueCoder(), windowCoder); JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupRDD = - GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, null); + GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); return groupRDD .map( @@ -550,6 +546,15 @@ public final class TransformTranslator { }; } + @Nullable + private static Partitioner getPartitioner(EvaluationContext context) { + Long bundleSize = + context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize(); + return (bundleSize > 0) + ? null + : new HashPartitioner(context.getSparkContext().defaultParallelism()); + } + private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap<>(); static { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 067a95c..5238f19 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.spark.Accumulator; +import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext$; @@ -304,7 +305,11 @@ public final class StreamingTransformTranslator { JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKeyStream = dStream.transform( rdd -> - GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, null)); + GroupCombineFunctions.groupByKeyOnly( + rdd, + coder.getKeyCoder(), + wvCoder, + new HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()))); // --- now group also by window. JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =