twosom commented on code in PR #33322:
URL: https://github.com/apache/beam/pull/33322#discussion_r1882995425
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java:
##########
@@ -49,18 +53,36 @@ public static <K, V> JavaRDD<KV<K,
Iterable<WindowedValue<V>>>> groupByKeyOnly(
@Nullable Partitioner partitioner) {
// we use coders to convert objects in the PCollection to byte arrays, so
they
// can be transferred over the network for the shuffle.
- JavaPairRDD<ByteArray, byte[]> pairRDD =
- rdd.map(new ReifyTimestampsAndWindowsFunction<>())
- .mapToPair(TranslationUtils.toPairFunction())
- .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-
- // If no partitioner is passed, the default group by key operation is
called
- JavaPairRDD<ByteArray, Iterable<byte[]>> groupedRDD =
- (partitioner != null) ? pairRDD.groupByKey(partitioner) :
pairRDD.groupByKey();
-
- return groupedRDD
- .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder))
- .map(new TranslationUtils.FromPairFunction<>());
+ final JavaPairRDD<ByteArray, byte[]> pairRDD =
+ rdd.mapPartitionsToPair(
Review Comment:
Yes, there are several advantages to using `mapPartitionsToPair` and
`mapPartitions`.
It processes an entire partition at once instead of individual elements,
reducing function call overhead.
This approach reduces intermediate RDDs and stages, resulting in significant
shuffle size reduction (6.1GiB → 1487.7MiB).
Additionally, Spark can better optimize execution with combined operations,
reducing serialization overhead. While slightly more complex to read, the
performance benefits outweigh this trade-off for a critical operation like
`GroupByKey`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]