davidradl commented on code in PR #25792:
URL: https://github.com/apache/flink/pull/25792#discussion_r1881730342


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java:
##########
@@ -259,17 +265,35 @@ public void processWatermark(Watermark mark) throws 
Exception {
             super.processWatermark(mark);
             return;
         }
-        asyncExecutionController.processNonRecord(
-                () -> {
-                    // todo: make async operator deal with interruptible 
watermark
-                    if (timeServiceManager != null) {
-                        CompletableFuture<Void> future = 
timeServiceManager.advanceWatermark(mark);
-                        future.thenAccept(v -> output.emitWatermark(mark));
-                        
asyncExecutionController.drainWithTimerIfNeeded(future);
-                    } else {
+        asyncExecutionController.processNonRecord(() -> 
doProcessWatermark(mark, null));
+    }
+
+    /**
+     * Handle the watermark and timers, then run a provided {@link Runnable} 
asynchronously right
+     * after the watermark is emitted.
+     *
+     * @param mark The watermark.
+     * @param postAction The runnable for post action.
+     */
+    protected void doProcessWatermark(Watermark mark, @Nullable Runnable 
postAction)
+            throws Exception {
+        // todo: make async operator deal with interruptible watermark
+        if (timeServiceManager != null) {
+            CompletableFuture<Void> future = 
timeServiceManager.advanceWatermark(mark);
+            future.thenAccept(
+                    v -> {
                         output.emitWatermark(mark);
-                    }
-                });
+                        if (postAction != null) {
+                            postAction.run();
+                        }
+                    });
+            asyncExecutionController.drainWithTimerIfNeeded(future);
+        } else {
+            output.emitWatermark(mark);

Review Comment:
   nit: the future.thenAccept( and the else seem to have the same logic, can we 
put this in a function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to