Zakelly commented on code in PR #25773:
URL: https://github.com/apache/flink/pull/25773#discussion_r1879241229


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -330,7 +332,22 @@ public void processWatermark(Watermark mark) throws 
Exception {
             super.processWatermark(mark);
             return;
         }
-        asyncExecutionController.processNonRecord(() -> 
super.processWatermark(mark));
+        asyncExecutionController.processNonRecord(
+                () -> {
+                    // todo: make async operator deal with interruptible 
watermark
+                    if (timeServiceManager != null) {
+                        CompletableFuture<Void> future = 
timeServiceManager.advanceWatermark(mark);
+                        if (asyncExecutionController.isSerialMode()) {
+                            asyncExecutionController.drainInflightRecords(0);
+                            future.get();
+                            output.emitWatermark(mark);
+                        } else {
+                            future.thenAccept(v -> output.emitWatermark(mark));
+                        }

Review Comment:
   How about package these logic into a method of `AEC`?
   e.g.
   
   ```suggestion
                           CompletableFuture<Void> future = 
timeServiceManager.advanceWatermark(mark);
                           future.thenAccept(v -> output.emitWatermark(mark));
                           asyncExecutionController.drainWithTimer();
   ```
   
   And we only do `assert future.isDone()` in the `drainWithTimer()` instead of 
the `future.get()`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -330,7 +332,22 @@ public void processWatermark(Watermark mark) throws 
Exception {
             super.processWatermark(mark);
             return;
         }
-        asyncExecutionController.processNonRecord(() -> 
super.processWatermark(mark));
+        asyncExecutionController.processNonRecord(
+                () -> {
+                    // todo: make async operator deal with interruptible 
watermark
+                    if (timeServiceManager != null) {
+                        CompletableFuture<Void> future = 
timeServiceManager.advanceWatermark(mark);
+                        if (asyncExecutionController.isSerialMode()) {
+                            asyncExecutionController.drainInflightRecords(0);
+                            future.get();
+                            output.emitWatermark(mark);
+                        } else {
+                            future.thenAccept(v -> output.emitWatermark(mark));
+                        }

Review Comment:
   And I think we could handle the interruptible timers in later PR(s).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to