fredia commented on code in PR #25816:
URL: https://github.com/apache/flink/pull/25816#discussion_r1890123317


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java:
##########
@@ -167,43 +217,78 @@ public static class Epoch {
         /** The number of records that are still ongoing in this epoch. */
         int ongoingRecordCount;
 
-        /** The action associated with non-record of this epoch(e.g. advance 
watermark). */
-        @Nullable Runnable action;
+        /** The action associated with non-record of this epoch(e.g. 
triggering timer). */
+        @Nullable Runnable triggerAction;
+
+        /**
+         * The action when we finish this epoch and the triggerAction as well 
as any async
+         * processing.
+         */
+        @Nullable Runnable finalAction;
 
         EpochStatus status;
 
         public Epoch(long id) {
             this.id = id;
             this.ongoingRecordCount = 0;
             this.status = EpochStatus.OPEN;
-            this.action = null;
+            this.triggerAction = null;
+            this.finalAction = null;
         }
 
         /**
-         * Try to finish this epoch.
+         * Try to finish this epoch. This is the core logic of triggering 
actions. The state machine
+         * and timeline are as follows:
+         *
+         * <pre>
+         * Action:     close()       triggerAction       wait             
finalAction
+         * Statue:  OPEN ----- CLOSED ----------FINISHING -------- FINISHED 
-----------
+         * </pre>
          *
          * @return whether this epoch has been normally finished.
          */
         boolean tryFinish() {
-            if (this.status == EpochStatus.FINISHED) {
+            if (status == EpochStatus.FINISHED) {

Review Comment:
   After we have `recursiveFlag`, can this branch be removed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java:
##########
@@ -68,12 +68,20 @@ public enum ParallelMode {
     /** Current active epoch, only one active epoch at the same time. */
     Epoch activeEpoch;
 
+    /** The epoch that will be provided to new records. */
+    @Nullable Epoch overrideEpoch;

Review Comment:
   IIUC, `overrideEpoch`  is a `closed` epoch which prepares for finishing.
   How about renaming it to `finishingEpoch`?



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java:
##########
@@ -113,12 +110,9 @@ public CompletableFuture<Void> advanceWatermark(long time) 
throws Exception {
             final InternalTimer<K, N> timerToTrigger = timer;
             CompletableFuture<Void> future = new CompletableFuture<>();

Review Comment:
   `future` is unusable now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to