je-ik commented on code in PR #33521:
URL: https://github.com/apache/beam/pull/33521#discussion_r1905432295


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java:
##########
@@ -146,32 +146,36 @@ public void evaluate(GroupByKey<K, V> transform, 
EvaluationContext context) {
 
         JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupedByKey;
         Partitioner partitioner = getPartitioner(context);
-        // As this is batch, we can ignore triggering and allowed lateness 
parameters.
-        if (windowingStrategy.getWindowFn().equals(new GlobalWindows())
-            && 
windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW))
 {
-          // we can drop the windows and recover them later
-          groupedByKey =
-              GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(
-                  inRDD, keyCoder, coder.getValueCoder(), partitioner);

Review Comment:
   Actually, seems that these optimizations are not mutually exclusive. We 
might want to apply both.



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java:
##########
@@ -146,32 +146,36 @@ public void evaluate(GroupByKey<K, V> transform, 
EvaluationContext context) {
 
         JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupedByKey;
         Partitioner partitioner = getPartitioner(context);
-        // As this is batch, we can ignore triggering and allowed lateness 
parameters.
-        if (windowingStrategy.getWindowFn().equals(new GlobalWindows())
-            && 
windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW))
 {
-          // we can drop the windows and recover them later
-          groupedByKey =
-              GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(
-                  inRDD, keyCoder, coder.getValueCoder(), partitioner);
-        } else if 
(GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) 
{
+        if 
(GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) 
{
           // we can have a memory sensitive translation for non-merging windows
           groupedByKey =

Review Comment:
   I know this is not part of this issue, but there is unresolved problem with 
this - under certain circumstances there is a need for GBK result to be 
Reiterable (e.g. CoGroupByKey). Could we figure out a way to automatically 
figure out if this expansion is correct? From the top of my head - if a 
Pipeline does not use CoGBK at all (better if we can look is this transform is 
part of CoGBK expansion), then this might be OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to