After looking further into 
[BEAM-1815](https://jira.apache.org/jira/browse/BEAM-1815) my understanding is 
that the double shuffle happened in streaming mode only because there is the 
groupByKey in GroupCombineFunctions.groupByKeyOnly followed by updateStateByKey 
in SparkGroupAlsoByWindowViaWindowSet.groupAlsoByWindow. This will not happen 
in batch mode so I believe we should always use groupByKey() without the 
hashpartitioner when in batch mode. That way we don't squash a large amount of 
data into a few partitions.

If you agree I will change (in TransformTranslator):
```java
        JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey;
        if 
(context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize()
            > 0) {
          groupedByKey =
              GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD, 
keyCoder, wvCoder);
        } else {
          groupedByKey = GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, 
wvCoder);
        }
```
to:
```java
        JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey =
              GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD, 
keyCoder, wvCoder);
```

[ Full content available at: https://github.com/apache/beam/pull/6181 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to