damccorm commented on code in PR #33322:
URL: https://github.com/apache/beam/pull/33322#discussion_r1882614060
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctionsTest.java:
##########
@@ -121,54 +112,6 @@ public void testGbkIteratorValuesCannotBeReiterated()
throws Coder.NonDeterminis
}
}
- @Test
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void testGroupByKeyInGlobalWindowWithPartitioner() {
- // mocking
- Partitioner mockPartitioner = mock(Partitioner.class);
- JavaRDD mockRdd = mock(JavaRDD.class);
- Coder mockKeyCoder = mock(Coder.class);
- Coder mockValueCoder = mock(Coder.class);
- JavaPairRDD mockRawKeyValues = mock(JavaPairRDD.class);
- JavaPairRDD mockGrouped = mock(JavaPairRDD.class);
-
- when(mockRdd.mapToPair(any())).thenReturn(mockRawKeyValues);
- when(mockRawKeyValues.groupByKey(any(Partitioner.class)))
- .thenAnswer(
- invocation -> {
- Partitioner partitioner = invocation.getArgument(0);
- assertEquals(partitioner, mockPartitioner);
- return mockGrouped;
- });
- when(mockGrouped.map(any())).thenReturn(mock(JavaRDD.class));
-
- GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(
- mockRdd, mockKeyCoder, mockValueCoder, mockPartitioner);
-
- verify(mockRawKeyValues, never()).groupByKey();
- verify(mockRawKeyValues, times(1)).groupByKey(any(Partitioner.class));
- }
-
- @Test
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void testGroupByKeyInGlobalWindowWithoutPartitioner() {
- // mocking
- JavaRDD mockRdd = mock(JavaRDD.class);
- Coder mockKeyCoder = mock(Coder.class);
- Coder mockValueCoder = mock(Coder.class);
- JavaPairRDD mockRawKeyValues = mock(JavaPairRDD.class);
- JavaPairRDD mockGrouped = mock(JavaPairRDD.class);
-
- when(mockRdd.mapToPair(any())).thenReturn(mockRawKeyValues);
- when(mockRawKeyValues.groupByKey()).thenReturn(mockGrouped);
-
- GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(
- mockRdd, mockKeyCoder, mockValueCoder, null);
-
- verify(mockRawKeyValues, times(1)).groupByKey();
- verify(mockRawKeyValues, never()).groupByKey(any(Partitioner.class));
Review Comment:
Why did we need to get rid of these 2 tests?
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java:
##########
@@ -49,18 +53,36 @@ public static <K, V> JavaRDD<KV<K,
Iterable<WindowedValue<V>>>> groupByKeyOnly(
@Nullable Partitioner partitioner) {
// we use coders to convert objects in the PCollection to byte arrays, so
they
// can be transferred over the network for the shuffle.
- JavaPairRDD<ByteArray, byte[]> pairRDD =
- rdd.map(new ReifyTimestampsAndWindowsFunction<>())
- .mapToPair(TranslationUtils.toPairFunction())
- .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-
- // If no partitioner is passed, the default group by key operation is
called
- JavaPairRDD<ByteArray, Iterable<byte[]>> groupedRDD =
- (partitioner != null) ? pairRDD.groupByKey(partitioner) :
pairRDD.groupByKey();
-
- return groupedRDD
- .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder))
- .map(new TranslationUtils.FromPairFunction<>());
+ final JavaPairRDD<ByteArray, byte[]> pairRDD =
+ rdd.mapPartitionsToPair(
Review Comment:
Is there an advantage to doing this in a single mapPartitionsToPair function
(as opposed to the original implementation)? It is a little harder to
read/reuses a little less code
The same question applies to the mapping after the combineByKey
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]