After looking further into
[BEAM-1815](https://jira.apache.org/jira/browse/BEAM-1815) my understanding is
that the double shuffle happened in streaming mode only because there is the
groupByKey in GroupCombineFunctions.groupByKeyOnly followed by updateStateByKey
in SparkGroupAlsoByWindowViaWindowSet.groupAlsoByWindow. This will not happen
in batch mode so I believe we should always use groupByKey() without the
hashpartitioner when in batch mode. That way we don't squash a large amount of
data into a few partitions.
If you agree I will change (in TransformTranslator):
```java
JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey;
if
(context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize()
> 0) {
groupedByKey =
GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD,
keyCoder, wvCoder);
} else {
groupedByKey = GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder,
wvCoder);
}
```
to:
```java
JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey =
GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD,
keyCoder, wvCoder);
```
[ Full content available at: https://github.com/apache/beam/pull/6181 ]
This message was relayed via gitbox.apache.org for [email protected]