This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3c28414a7581e9a79da33fa5904a2a6aa7d7dc9
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Mon Jan 13 18:52:52 2025 +0800

    [FLINK-35412][Runtime] Remove `BatchCallbackRunner`
---
 .../asyncprocessing/AsyncExecutionController.java  |   4 +-
 .../asyncprocessing/BatchCallbackRunner.java       | 148 ---------------------
 .../asyncprocessing/CallbackRunnerWrapper.java     |  73 ++++++++++
 .../asyncprocessing/StateFutureFactory.java        |   4 +-
 ...nerTest.java => CallbackRunnerWrapperTest.java} |  29 +---
 5 files changed, 81 insertions(+), 177 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 62160d24de5..b253c236dca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -69,7 +69,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
     private final int batchSize;
 
     /** The runner for callbacks. Basically a wrapper of mailbox executor. */
-    private final BatchCallbackRunner callbackRunner;
+    private final CallbackRunnerWrapper callbackRunner;
 
     /**
      * The timeout of {@link StateRequestBuffer#activeQueue} triggering in 
milliseconds. If the
@@ -160,7 +160,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.exceptionHandler = exceptionHandler;
-        this.callbackRunner = new BatchCallbackRunner(mailboxExecutor, 
this::notifyNewMail);
+        this.callbackRunner = new CallbackRunnerWrapper(mailboxExecutor, 
this::notifyNewMail);
         this.stateFutureFactory = new StateFutureFactory<>(this, 
callbackRunner, exceptionHandler);
 
         this.stateExecutor = stateExecutor;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java
deleted file mode 100644
index d106f62bfbb..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.asyncprocessing;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.util.function.ThrowingRunnable;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import java.util.ArrayList;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A {@link org.apache.flink.core.state.StateFutureImpl.CallbackRunner} that 
put one mail in {@link
- * MailboxExecutor} but run multiple callbacks within one mail.
- */
-public class BatchCallbackRunner {
-
-    static final int DEFAULT_BATCH_SIZE = 3000;
-
-    private final MailboxExecutor mailboxExecutor;
-
-    private final int batchSize;
-
-    /** The callbacks divided in batch. */
-    private final ConcurrentLinkedDeque<ArrayList<ThrowingRunnable<? extends 
Exception>>>
-            callbackQueue;
-
-    /** The lock to protect the active buffer (batch). */
-    private final Object activeBufferLock = new Object();
-
-    /** The active buffer (batch) to gather incoming callbacks. */
-    @GuardedBy("activeBufferLock")
-    private ArrayList<ThrowingRunnable<? extends Exception>> activeBuffer;
-
-    /** Counter of current callbacks. */
-    private final AtomicInteger currentCallbacks = new AtomicInteger(0);
-
-    /** Whether there is a mail in mailbox. */
-    private volatile boolean hasMail = false;
-
-    /** The logic to notify new mails to AEC. */
-    private final Runnable newMailNotify;
-
-    BatchCallbackRunner(MailboxExecutor mailboxExecutor, Runnable 
newMailNotify) {
-        this.mailboxExecutor = mailboxExecutor;
-        this.newMailNotify = newMailNotify;
-        this.batchSize = DEFAULT_BATCH_SIZE;
-        this.callbackQueue = new ConcurrentLinkedDeque<>();
-        this.activeBuffer = new ArrayList<>();
-    }
-
-    /**
-     * Submit a callback to run.
-     *
-     * @param task the callback.
-     */
-    public void submit(ThrowingRunnable<? extends Exception> task) {
-        synchronized (activeBufferLock) {
-            activeBuffer.add(task);
-            if (activeBuffer.size() >= batchSize) {
-                callbackQueue.offerLast(activeBuffer);
-                activeBuffer = new ArrayList<>(batchSize);
-            }
-        }
-        if (currentCallbacks.getAndIncrement() == 0) {
-            // Only when the single first callback is inserted, the mail 
should be inserted if not
-            // exist. Otherwise, the #runBatch() from task threads will keep 
the mail exist.
-            insertMail(false, true);
-        }
-    }
-
-    /**
-     * Insert a mail for a batch of mails.
-     *
-     * @param force force check if there should insert a callback regardless 
of the #hasMail flag.
-     * @param notify should notify the AEC (typically on the task thread) if 
new mail inserted.
-     */
-    private void insertMail(boolean force, boolean notify) {
-        // This method will be invoked via all ForSt I/O threads(from #submit) 
or task thread (from
-        // #runBatch()). The #hasMail flag should be protected by synchronized.
-        synchronized (this) {
-            if (force || !hasMail) {
-                if (currentCallbacks.get() > 0) {
-                    hasMail = true;
-                    mailboxExecutor.execute(
-                            this::runBatch, "Batch running callback of state 
requests");
-                    if (notify) {
-                        notifyNewMail();
-                    }
-                } else {
-                    hasMail = false;
-                }
-            }
-        }
-    }
-
-    /**
-     * Run at most a full batch of callbacks. If there has not been a full 
batch of callbacks, run
-     * current callbacks in active buffer.
-     */
-    public void runBatch() throws Exception {
-        ArrayList<ThrowingRunnable<? extends Exception>> batch = 
callbackQueue.poll();
-        if (batch == null) {
-            synchronized (activeBufferLock) {
-                if (!activeBuffer.isEmpty()) {
-                    batch = activeBuffer;
-                    activeBuffer = new ArrayList<>(batchSize);
-                }
-            }
-        }
-        if (batch != null) {
-            for (ThrowingRunnable<? extends Exception> task : batch) {
-                task.run();
-            }
-            currentCallbacks.addAndGet(-batch.size());
-        }
-        // If we are on the task thread, there is no need to notify.
-        insertMail(true, false);
-    }
-
-    private void notifyNewMail() {
-        if (newMailNotify != null) {
-            newMailNotify.run();
-        }
-    }
-
-    public boolean isHasMail() {
-        return hasMail;
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapper.java
new file mode 100644
index 00000000000..0408ca02a5a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link org.apache.flink.core.state.StateFutureImpl.CallbackRunner} that 
gives info of {@link
+ * #isHasMail()} to the AEC and notifies new mail if needed.
+ */
+public class CallbackRunnerWrapper {
+
+    private final MailboxExecutor mailboxExecutor;
+
+    /** Counter of current callbacks. */
+    private final AtomicInteger currentCallbacks = new AtomicInteger(0);
+
+    /** The logic to notify new mails to AEC. */
+    private final Runnable newMailNotify;
+
+    CallbackRunnerWrapper(MailboxExecutor mailboxExecutor, Runnable 
newMailNotify) {
+        this.mailboxExecutor = mailboxExecutor;
+        this.newMailNotify = newMailNotify;
+    }
+
+    /**
+     * Submit a callback to run.
+     *
+     * @param task the callback.
+     */
+    public void submit(ThrowingRunnable<? extends Exception> task) {
+        mailboxExecutor.execute(
+                () -> {
+                    // -1 before the task run, since the task may query 
#isHasMail, and we don't
+                    // want to return to true before the task is finished if 
there is no else mail.
+                    currentCallbacks.decrementAndGet();
+                    task.run();
+                },
+                "Callback of state request");
+        if (currentCallbacks.getAndIncrement() == 0) {
+            notifyNewMail();
+        }
+    }
+
+    private void notifyNewMail() {
+        if (newMailNotify != null) {
+            newMailNotify.run();
+        }
+    }
+
+    public boolean isHasMail() {
+        return currentCallbacks.get() > 0;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java
index 7b874a45ec7..c02bbb2c6bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java
@@ -28,12 +28,12 @@ import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandle
 public class StateFutureFactory<K> {
 
     private final AsyncExecutionController<K> asyncExecutionController;
-    private final BatchCallbackRunner callbackRunner;
+    private final CallbackRunnerWrapper callbackRunner;
     private final AsyncFrameworkExceptionHandler exceptionHandler;
 
     StateFutureFactory(
             AsyncExecutionController<K> asyncExecutionController,
-            BatchCallbackRunner callbackRunner,
+            CallbackRunnerWrapper callbackRunner,
             AsyncFrameworkExceptionHandler exceptionHandler) {
         this.asyncExecutionController = asyncExecutionController;
         this.callbackRunner = callbackRunner;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapperTest.java
similarity index 73%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapperTest.java
index 28ef9cc7f9b..80fd895c3f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapperTest.java
@@ -30,8 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link BatchCallbackRunner}. */
-public class BatchCallbackRunnerTest {
+/** Tests for {@link CallbackRunnerWrapper}. */
+public class CallbackRunnerWrapperTest {
 
     private static final ThrowingRunnable<? extends Exception> DUMMY = () -> 
{};
 
@@ -39,7 +39,8 @@ public class BatchCallbackRunnerTest {
     void testSingleSubmit() {
         ManualMailboxExecutor executor = new ManualMailboxExecutor();
         AtomicBoolean notified = new AtomicBoolean(false);
-        BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> 
notified.set(true));
+        CallbackRunnerWrapper runner =
+                new CallbackRunnerWrapper(executor, () -> notified.set(true));
         runner.submit(DUMMY);
         assertThat(runner.isHasMail()).isTrue();
         assertThat(notified.get()).isTrue();
@@ -47,28 +48,6 @@ public class BatchCallbackRunnerTest {
         assertThat(runner.isHasMail()).isFalse();
     }
 
-    @Test
-    void testHugeBatch() {
-        ManualMailboxExecutor executor = new ManualMailboxExecutor();
-        AtomicBoolean notified = new AtomicBoolean(false);
-        BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> 
notified.set(true));
-        for (int i = 0; i < BatchCallbackRunner.DEFAULT_BATCH_SIZE + 1; i++) {
-            runner.submit(DUMMY);
-        }
-        assertThat(runner.isHasMail()).isTrue();
-        assertThat(notified.get()).isTrue();
-        executor.runOne();
-        assertThat(runner.isHasMail()).isTrue();
-        notified.set(false);
-        runner.submit(DUMMY);
-        assertThat(notified.get()).isFalse();
-        executor.runOne();
-        assertThat(runner.isHasMail()).isFalse();
-        runner.submit(DUMMY);
-        assertThat(runner.isHasMail()).isTrue();
-        assertThat(notified.get()).isTrue();
-    }
-
     /** A mailbox executor that immediately executes code in the current 
thread. */
     public static class ManualMailboxExecutor implements MailboxExecutor {
 

Reply via email to