echauchot commented on a change in pull request #13101:
URL: https://github.com/apache/beam/pull/13101#discussion_r518660241
##########
File path:
runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
##########
@@ -345,15 +360,27 @@ public void outputWindowedValue(
// empty outputs are filtered later using DStream filtering
final StateAndTimers updated =
new StateAndTimers(
+ stateInternals.getWatermarkHoldStates(),
stateInternals.getState(),
SparkTimerInternals.serializeTimers(
timerInternals.getTimers(), timerDataCoder));
+ Instant lowerBound = timerInternals.currentInputWatermarkTime();
Review comment:
First a point on terms: lowWatermark is the outputWatermark and
highWatermark is the inputWatermark because the output WM is always lower than
the input WM right ?
Now, one interrogation: in the for loop, you select the minimum states time
that are before the current input WM, that means that such elements are the one
buffered in the groupby operation waiting to be output. So then you set spark
global watermark to this lower bound so that the elements are not dropped right
?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]