echauchot commented on a change in pull request #13101:
URL: https://github.com/apache/beam/pull/13101#discussion_r518660241



##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
##########
@@ -345,15 +360,27 @@ public void outputWindowedValue(
             // empty outputs are filtered later using DStream filtering
             final StateAndTimers updated =
                 new StateAndTimers(
+                    stateInternals.getWatermarkHoldStates(),
                     stateInternals.getState(),
                     SparkTimerInternals.serializeTimers(
                         timerInternals.getTimers(), timerDataCoder));
 
+            Instant lowerBound = timerInternals.currentInputWatermarkTime();

Review comment:
       First a point on terms: lowWatermark is the outputWatermark and 
highWatermark is the inputWatermark because the output WM is always lower than 
the input WM right ?
   Now, one interrogation: in the for loop, you select the minimum states time 
that are before the current input WM, that means that such elements are the one 
buffered in the groupby operation waiting to be output. So then you set spark 
global watermark to this lower bound so that the elements are not dropped right 
?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to