This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 06f9ff8 [BEAM-4783] Fix issues created in #6181.
new d3709b0 Merge pull request #7690: [BEAM-4783] Fix issues created in
#6181
06f9ff8 is described below
commit 06f9ff8c7264dfc890788acd03dd762837d0ddce
Author: Kyle Winkelman <[email protected]>
AuthorDate: Thu Jan 31 10:52:12 2019 -0600
[BEAM-4783] Fix issues created in #6181.
---
.../spark/translation/TransformTranslator.java | 21 +++++++++++++--------
.../streaming/StreamingTransformTranslator.java | 7 ++++++-
2 files changed, 19 insertions(+), 9 deletions(-)
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a555e6e..c6fc434 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -133,14 +133,8 @@ public final class TransformTranslator {
WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(),
windowFn.windowCoder());
// --- group by key only.
- Long bundleSize =
-
context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize();
- Partitioner partitioner =
- (bundleSize > 0)
- ? new
HashPartitioner(context.getSparkContext().defaultParallelism())
- : null;
JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey
=
- GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder,
partitioner);
+ GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder,
getPartitioner(context));
// --- now group also by window.
// for batch, GroupAlsoByWindow uses an in-memory StateInternals.
@@ -377,6 +371,7 @@ public final class TransformTranslator {
(KvCoder) context.getInput(transform).getCoder(),
windowingStrategy.getWindowFn().windowCoder(),
(JavaRDD) inRDD,
+ getPartitioner(context),
(MultiDoFnFunction) multiDoFnFunction);
} else {
all = inRDD.mapPartitionsToPair(multiDoFnFunction);
@@ -420,6 +415,7 @@ public final class TransformTranslator {
KvCoder<K, V> kvCoder,
Coder<? extends BoundedWindow> windowCoder,
JavaRDD<WindowedValue<KV<K, V>>> kvInRDD,
+ Partitioner partitioner,
MultiDoFnFunction<KV<K, V>, OutputT> doFnFunction) {
Coder<K> keyCoder = kvCoder.getKeyCoder();
@@ -427,7 +423,7 @@ public final class TransformTranslator {
WindowedValue.FullWindowedValueCoder.of(kvCoder.getValueCoder(),
windowCoder);
JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupRDD =
- GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, null);
+ GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder,
partitioner);
return groupRDD
.map(
@@ -550,6 +546,15 @@ public final class TransformTranslator {
};
}
+ @Nullable
+ private static Partitioner getPartitioner(EvaluationContext context) {
+ Long bundleSize =
+
context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize();
+ return (bundleSize > 0)
+ ? null
+ : new HashPartitioner(context.getSparkContext().defaultParallelism());
+ }
+
private static final Map<String, TransformEvaluator<?>> EVALUATORS = new
HashMap<>();
static {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 067a95c..5238f19 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.spark.Accumulator;
+import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext$;
@@ -304,7 +305,11 @@ public final class StreamingTransformTranslator {
JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>
groupedByKeyStream =
dStream.transform(
rdd ->
- GroupCombineFunctions.groupByKeyOnly(rdd,
coder.getKeyCoder(), wvCoder, null));
+ GroupCombineFunctions.groupByKeyOnly(
+ rdd,
+ coder.getKeyCoder(),
+ wvCoder,
+ new
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism())));
// --- now group also by window.
JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =