damccorm commented on code in PR #33322:
URL: https://github.com/apache/beam/pull/33322#discussion_r1884006358
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java:
##########
@@ -49,18 +53,36 @@ public static <K, V> JavaRDD<KV<K,
Iterable<WindowedValue<V>>>> groupByKeyOnly(
@Nullable Partitioner partitioner) {
// we use coders to convert objects in the PCollection to byte arrays, so
they
// can be transferred over the network for the shuffle.
- JavaPairRDD<ByteArray, byte[]> pairRDD =
- rdd.map(new ReifyTimestampsAndWindowsFunction<>())
- .mapToPair(TranslationUtils.toPairFunction())
- .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-
- // If no partitioner is passed, the default group by key operation is
called
- JavaPairRDD<ByteArray, Iterable<byte[]>> groupedRDD =
- (partitioner != null) ? pairRDD.groupByKey(partitioner) :
pairRDD.groupByKey();
-
- return groupedRDD
- .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder))
- .map(new TranslationUtils.FromPairFunction<>());
+ final JavaPairRDD<ByteArray, byte[]> pairRDD =
+ rdd.mapPartitionsToPair(
Review Comment:
> Additionally, Spark can better optimize execution with combined
operations, reducing serialization overhead.
Got it - this was the piece that I was missing - thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]