fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577700082


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +106,80 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.currentTriggerSeq = new AtomicLong(0);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));
+            ((ScheduledThreadPoolExecutor) 
this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+            // make sure shutdown removes all pending tasks
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    
.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
-                "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+                "Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordsNum {}",
                 batchSize,
+                bufferTimeout,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout(long triggerSeq) {
+        if (bufferTimeout > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        if (triggerSeq != 
currentTriggerSeq.get()) {
+                                            // if any new trigger occurs, skip 
this schedule
+                                            return;
+                                        }
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(true), 
"AEC-timeout");

Review Comment:
   This way may create an extra email and put it in the mailbox. I lean toward 
skipping it directly before submitting it to the mailbox. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to