yunfengzhou-hub commented on code in PR #24748:
URL: https://github.com/apache/flink/pull/24748#discussion_r1600886728


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java:
##########
@@ -188,6 +190,35 @@ public <K, N> InternalTimerService<N> 
getInternalTimerService(
                 (AsyncExecutionController<K>) asyncExecutionController);
     }
 
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        if (!isAsyncStateProcessingEnabled()) {
+            super.processWatermark(mark);
+            return;
+        }
+        asyncExecutionController.processNonRecord(() -> 
super.processWatermark(mark));

Review Comment:
   We may need to override `processWatermark1` and `processWatermark2` as well, 
or we can override `processWatermark(Watermark, int)`, like that for 
`processWatermarkStatus`. I also understand it that this PR is mainly 
responsible for introducing the epoch mechanism, and that we would have another 
jira ticket and PR to apply epoch to all events and all cases. So it is also OK 
for me if you would like to make the change in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to