This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 06f9ff8  [BEAM-4783] Fix issues created in #6181.
     new d3709b0  Merge pull request #7690: [BEAM-4783] Fix issues created in 
#6181
06f9ff8 is described below

commit 06f9ff8c7264dfc890788acd03dd762837d0ddce
Author: Kyle Winkelman <kyle.winkel...@optum.com>
AuthorDate: Thu Jan 31 10:52:12 2019 -0600

    [BEAM-4783] Fix issues created in #6181.
---
 .../spark/translation/TransformTranslator.java      | 21 +++++++++++++--------
 .../streaming/StreamingTransformTranslator.java     |  7 ++++++-
 2 files changed, 19 insertions(+), 9 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a555e6e..c6fc434 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -133,14 +133,8 @@ public final class TransformTranslator {
             WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), 
windowFn.windowCoder());
 
         // --- group by key only.
-        Long bundleSize =
-            
context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize();
-        Partitioner partitioner =
-            (bundleSize > 0)
-                ? new 
HashPartitioner(context.getSparkContext().defaultParallelism())
-                : null;
         JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey 
=
-            GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, 
partitioner);
+            GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, 
getPartitioner(context));
 
         // --- now group also by window.
         // for batch, GroupAlsoByWindow uses an in-memory StateInternals.
@@ -377,6 +371,7 @@ public final class TransformTranslator {
                   (KvCoder) context.getInput(transform).getCoder(),
                   windowingStrategy.getWindowFn().windowCoder(),
                   (JavaRDD) inRDD,
+                  getPartitioner(context),
                   (MultiDoFnFunction) multiDoFnFunction);
         } else {
           all = inRDD.mapPartitionsToPair(multiDoFnFunction);
@@ -420,6 +415,7 @@ public final class TransformTranslator {
       KvCoder<K, V> kvCoder,
       Coder<? extends BoundedWindow> windowCoder,
       JavaRDD<WindowedValue<KV<K, V>>> kvInRDD,
+      Partitioner partitioner,
       MultiDoFnFunction<KV<K, V>, OutputT> doFnFunction) {
     Coder<K> keyCoder = kvCoder.getKeyCoder();
 
@@ -427,7 +423,7 @@ public final class TransformTranslator {
         WindowedValue.FullWindowedValueCoder.of(kvCoder.getValueCoder(), 
windowCoder);
 
     JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupRDD =
-        GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, null);
+        GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
     return groupRDD
         .map(
@@ -550,6 +546,15 @@ public final class TransformTranslator {
     };
   }
 
+  @Nullable
+  private static Partitioner getPartitioner(EvaluationContext context) {
+    Long bundleSize =
+        
context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize();
+    return (bundleSize > 0)
+        ? null
+        : new HashPartitioner(context.getSparkContext().defaultParallelism());
+  }
+
   private static final Map<String, TransformEvaluator<?>> EVALUATORS = new 
HashMap<>();
 
   static {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 067a95c..5238f19 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.apache.spark.Accumulator;
+import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.JavaSparkContext$;
@@ -304,7 +305,11 @@ public final class StreamingTransformTranslator {
         JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> 
groupedByKeyStream =
             dStream.transform(
                 rdd ->
-                    GroupCombineFunctions.groupByKeyOnly(rdd, 
coder.getKeyCoder(), wvCoder, null));
+                    GroupCombineFunctions.groupByKeyOnly(
+                        rdd,
+                        coder.getKeyCoder(),
+                        wvCoder,
+                        new 
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism())));
 
         // --- now group also by window.
         JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =

Reply via email to