dmvk commented on a change in pull request #15994:
URL: https://github.com/apache/beam/pull/15994#discussion_r754658243



##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
##########
@@ -52,6 +52,8 @@
       Instant outputTimestamp,
       TimeDomain timeDomain);
 
+  <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
outputTimestamp, KeyT key);

Review comment:
       ```suggestion
     /** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
     <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
outputTimestamp, KeyT key);
   ```

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
     }
   }
 
+  public List<ByteBuffer> getGlobalWindowStateKeys() {

Review comment:
       This will materialize all of the keys in-memory, which could be a 
problem with really large state (when using RocksDB).
   
   Also since we're collecting keys into list, there will be duplicates if we 
have multiple states per key 🤔 (so we could expire a single window multiple 
times)
   
   That being said, I don't know how to solve the "materialization" problem in 
combination with de-duplication :( (it would be fairly easy if we'd have only a 
single descriptor)
   
   So for know we should at least replace list with set to address the possible 
duplicates. Also adding a failing test case for this would be great.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -791,11 +796,30 @@ private long computeOutputWatermark(long 
inputWatermarkHold) {
 
   private void maybeEmitWatermark(long watermark) {
     if (watermark > currentOutputWatermark) {
+      // If this is the end of the global window, then call onWindowExpiration 
callbacks. For other
+      // windows, this will
+      // be called as part of the garbage-collection timer.

Review comment:
       nit
   ```suggestion
         // windows, this will be called as part of the garbage-collection 
timer.
   ```

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
     }
   }
 
+  public List<ByteBuffer> getGlobalWindowStateKeys() {

Review comment:
       Hmm second thought, we could probably simply pick a single state 
descriptor that we know is always present (some of the system ones, eg. 
watermark hold). Then we should be able to expire keys one by one.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -215,6 +216,22 @@ public void clearGlobalState() {
     }
   }
 
+  public List<ByteBuffer> getGlobalWindowStateKeys() {

Review comment:
       nit
   ```suggestion
     @SuppressWarnings("unchecked")
     public List<ByteBuffer> getGlobalWindowStateKeys() {
   ```

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
##########
@@ -186,6 +201,12 @@ public TimerInternals timerInternals() {
 
     fireEligibleTimers(key, timerInternals, doFnRunner);
 
+    if (usesOnWindowExpiration) {
+      for (BoundedWindow window : windowsSeen) {

Review comment:
       My only concern here would be that all the windows for a single key need 
to fit in-memory. For example year worth of 1m tumbling windows would be ~500k 
windows. This could be IMO quite common for batch case.
   
   What about doing a simple optimization here by reusing the timestamp sorting 
mechanism that we already have in-place for the `@RequireTimeSortedInput` [1]?
   
   Then we could simply check for expiring windows after each element / every 
time the timestamp progresses.
   
   [1] 
https://github.com/apache/beam/blob/v2.34.0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L702




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to