dmvk commented on a change in pull request #15994:
URL: https://github.com/apache/beam/pull/15994#discussion_r754658243
##########
File path:
runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
##########
@@ -52,6 +52,8 @@
Instant outputTimestamp,
TimeDomain timeDomain);
+ <KeyT> void onWindowExpiration(BoundedWindow window, Instant
outputTimestamp, KeyT key);
Review comment:
```suggestion
/** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
<KeyT> void onWindowExpiration(BoundedWindow window, Instant
outputTimestamp, KeyT key);
```
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
}
}
+ public List<ByteBuffer> getGlobalWindowStateKeys() {
Review comment:
This will materialize all of the keys in-memory, which could be a
problem with really large state (when using RocksDB).
Also since we're collecting keys into list, there will be duplicates if we
have multiple states per key 🤔 (so we could expire a single window multiple
times)
That being said, I don't know how to solve the "materialization" problem in
combination with de-duplication :( (it would be fairly easy if we'd have only a
single descriptor)
So for know we should at least replace list with set to address the possible
duplicates. Also adding a failing test case for this would be great.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -791,11 +796,30 @@ private long computeOutputWatermark(long
inputWatermarkHold) {
private void maybeEmitWatermark(long watermark) {
if (watermark > currentOutputWatermark) {
+ // If this is the end of the global window, then call onWindowExpiration
callbacks. For other
+ // windows, this will
+ // be called as part of the garbage-collection timer.
Review comment:
nit
```suggestion
// windows, this will be called as part of the garbage-collection
timer.
```
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
}
}
+ public List<ByteBuffer> getGlobalWindowStateKeys() {
Review comment:
Hmm second thought, we could probably simply pick a single state
descriptor that we know is always present (some of the system ones, eg.
watermark hold). Then we should be able to expire keys one by one.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
}
}
+ public List<ByteBuffer> getGlobalWindowStateKeys() {
Review comment:
nit
```suggestion
@SuppressWarnings("unchecked")
public List<ByteBuffer> getGlobalWindowStateKeys() {
```
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
##########
@@ -186,6 +201,12 @@ public TimerInternals timerInternals() {
fireEligibleTimers(key, timerInternals, doFnRunner);
+ if (usesOnWindowExpiration) {
+ for (BoundedWindow window : windowsSeen) {
Review comment:
My only concern here would be that all the windows for a single key need
to fit in-memory. For example year worth of 1m tumbling windows would be ~500k
windows. This could be IMO quite common for batch case.
What about doing a simple optimization here by reusing the timestamp sorting
mechanism that we already have in-place for the `@RequireTimeSortedInput` [1]?
Then we could simply check for expiring windows after each element / every
time the timestamp progresses.
[1]
https://github.com/apache/beam/blob/v2.34.0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L702
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]