Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1574559188
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
*/
final AtomicInteger inFlightRecordNum;
+ /**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if
yes, a trigger will
+ * perform actively when the next state request arrives even if the
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+ final AtomicBoolean timeoutFlag;
+
+ /** The executor service that schedules and calls the triggers of this
task. */
+ final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ ScheduledFuture<Void> currentScheduledFuture;
+
public AsyncExecutionController(MailboxExecutor mailboxExecutor,
StateExecutor stateExecutor) {
- this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE,
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+ this(
+ mailboxExecutor,
+ stateExecutor,
+ DEFAULT_BATCH_SIZE,
+ DEFAULT_BUFFER_TIMEOUT,
+ DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
}
public AsyncExecutionController(
MailboxExecutor mailboxExecutor,
StateExecutor stateExecutor,
int batchSize,
+ long bufferTimeOut,
int maxInFlightRecords) {
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
this.mailboxExecutor = mailboxExecutor;
this.stateFutureFactory = new StateFutureFactory<>(this,
mailboxExecutor);
this.stateExecutor = stateExecutor;
this.batchSize = batchSize;
+ this.bufferTimeOut = bufferTimeOut;
this.maxInFlightRecordNum = maxInFlightRecords;
this.stateRequestsBuffer = new StateRequestBuffer<>();
this.inFlightRecordNum = new AtomicInteger(0);
+ this.timeoutFlag = new AtomicBoolean(false);
+
+ // ----------------- initialize buffer timeout -------------------
+ this.currentScheduledFuture = null;
+ if (bufferTimeOut > 0) {
+ this.scheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "AEC-scheduler");
+ }
+ });
+ this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+ // make sure shutdown removes all pending tasks
+
this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ } else {
+ this.scheduledExecutor = null;
+ }
+
LOG.info(
"Create AsyncExecutionController: batchSize {},
maxInFlightRecordsNum {}",
batchSize,
maxInFlightRecords);
}
+ void scheduleTimeout() {
+ if (bufferTimeOut > 0) {
+ if (currentScheduledFuture != null
+ && !currentScheduledFuture.isDone()
+ && !currentScheduledFuture.isCancelled()) {
+ currentScheduledFuture.cancel(false);
+ }
+ currentScheduledFuture =
+ (ScheduledFuture<Void>)
+ scheduledExecutor.schedule(
+ () -> {
+ timeoutFlag.set(true);
+ mailboxExecutor.execute(
+ () -> triggerIfNeeded(false),
"AEC-timeout");
Review Comment:
I'd suggest a `sequence number` for each buffer to make sure the timeout
trigger the intended one.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
*/
final AtomicInteger inFlightRecordNum;
+ /**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if
yes, a trigger will
+ * perform actively when the next state request arrives even if the
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+ final AtomicBoolean timeoutFlag;
+
+ /** The executor service that schedules and calls the triggers of this
task. */
+ final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ ScheduledFuture<Void> currentScheduledFuture;
+
public AsyncExecutionController(MailboxExecutor mailboxExecutor,
StateExecutor stateExecutor) {
- this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE,
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+ this(
+ mailboxExecutor,
+ stateExecutor,
+ DEFAULT_BATCH_SIZE,
+ DEFAULT_BUFFER_TIMEOUT,
+ DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
}
public AsyncExecutionController(
MailboxExecutor mailboxExecutor,
StateExecutor stateExecutor,
int batchSize,
+ long bufferTimeOut,
int maxInFlightRecords) {
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
this.mailboxExecutor = mailboxExecutor;
this.stateFutureFactory = new StateFutureFactory<>(this,
mailboxExecutor);
this.stateExecutor = stateExecutor;
this.batchSize = batchSize;
+ this.bufferTimeOut = bufferTimeOut;
this.maxInFlightRecordNum = maxInFlightRecords;
this.stateRequestsBuffer = new StateRequestBuffer<>();
this.inFlightRecordNum = new AtomicInteger(0);
+ this.timeoutFlag = new AtomicBoolean(false);
+
+ // ----------------- initialize buffer timeout -------------------
+ this.currentScheduledFuture = null;
+ if (bufferTimeOut > 0) {
+ this.scheduledExecutor =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactory() {
Review Comment:
`new ExecutorThreadFactory("AEC-timeout-scheduler")` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]