kennknowles commented on a change in pull request #14120:
URL: https://github.com/apache/beam/pull/14120#discussion_r592528690



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
##########
@@ -83,7 +83,7 @@ public void reduce(
         new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
     AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner;
-    if 
(windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()))
 {
+    if (windowingStrategy.needsMerge() && windowingStrategy.getWindowFn() 
instanceof Sessions) {

Review comment:
       I wonder if I was wrong actually and this is redundant. I would guess 
`FlinkMergingNonShuffleReduceFunction` will not be chosen unless `needsMerge` 
is already known to be true. Sorry I missed the context for this change. I 
think it is fine, but maybe not needed after all.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
##########
@@ -98,11 +98,10 @@ public void combine(
     if (groupedByWindow) {
       reduceRunner = new SingleWindowFlinkCombineRunner<>();
     } else {
-      if (windowingStrategy.needsMerge()
-          && 
!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder()))
 {
-        reduceRunner = new HashingFlinkCombineRunner<>();
-      } else {
+      if (windowingStrategy.needsMerge() && windowingStrategy.getWindowFn() 
instanceof Sessions) {

Review comment:
       is `SingleWindowFlinkCombineRunner` actually what is used for 
non-merging?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to