This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e3c28414a7581e9a79da33fa5904a2a6aa7d7dc9 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Mon Jan 13 18:52:52 2025 +0800 [FLINK-35412][Runtime] Remove `BatchCallbackRunner` --- .../asyncprocessing/AsyncExecutionController.java | 4 +- .../asyncprocessing/BatchCallbackRunner.java | 148 --------------------- .../asyncprocessing/CallbackRunnerWrapper.java | 73 ++++++++++ .../asyncprocessing/StateFutureFactory.java | 4 +- ...nerTest.java => CallbackRunnerWrapperTest.java} | 29 +--- 5 files changed, 81 insertions(+), 177 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 62160d24de5..b253c236dca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -69,7 +69,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab private final int batchSize; /** The runner for callbacks. Basically a wrapper of mailbox executor. */ - private final BatchCallbackRunner callbackRunner; + private final CallbackRunnerWrapper callbackRunner; /** * The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the @@ -160,7 +160,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.exceptionHandler = exceptionHandler; - this.callbackRunner = new BatchCallbackRunner(mailboxExecutor, this::notifyNewMail); + this.callbackRunner = new CallbackRunnerWrapper(mailboxExecutor, this::notifyNewMail); this.stateFutureFactory = new StateFutureFactory<>(this, callbackRunner, exceptionHandler); this.stateExecutor = stateExecutor; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java deleted file mode 100644 index d106f62bfbb..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.asyncprocessing; - -import org.apache.flink.api.common.operators.MailboxExecutor; -import org.apache.flink.util.function.ThrowingRunnable; - -import javax.annotation.concurrent.GuardedBy; - -import java.util.ArrayList; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A {@link org.apache.flink.core.state.StateFutureImpl.CallbackRunner} that put one mail in {@link - * MailboxExecutor} but run multiple callbacks within one mail. - */ -public class BatchCallbackRunner { - - static final int DEFAULT_BATCH_SIZE = 3000; - - private final MailboxExecutor mailboxExecutor; - - private final int batchSize; - - /** The callbacks divided in batch. */ - private final ConcurrentLinkedDeque<ArrayList<ThrowingRunnable<? extends Exception>>> - callbackQueue; - - /** The lock to protect the active buffer (batch). */ - private final Object activeBufferLock = new Object(); - - /** The active buffer (batch) to gather incoming callbacks. */ - @GuardedBy("activeBufferLock") - private ArrayList<ThrowingRunnable<? extends Exception>> activeBuffer; - - /** Counter of current callbacks. */ - private final AtomicInteger currentCallbacks = new AtomicInteger(0); - - /** Whether there is a mail in mailbox. */ - private volatile boolean hasMail = false; - - /** The logic to notify new mails to AEC. */ - private final Runnable newMailNotify; - - BatchCallbackRunner(MailboxExecutor mailboxExecutor, Runnable newMailNotify) { - this.mailboxExecutor = mailboxExecutor; - this.newMailNotify = newMailNotify; - this.batchSize = DEFAULT_BATCH_SIZE; - this.callbackQueue = new ConcurrentLinkedDeque<>(); - this.activeBuffer = new ArrayList<>(); - } - - /** - * Submit a callback to run. - * - * @param task the callback. - */ - public void submit(ThrowingRunnable<? extends Exception> task) { - synchronized (activeBufferLock) { - activeBuffer.add(task); - if (activeBuffer.size() >= batchSize) { - callbackQueue.offerLast(activeBuffer); - activeBuffer = new ArrayList<>(batchSize); - } - } - if (currentCallbacks.getAndIncrement() == 0) { - // Only when the single first callback is inserted, the mail should be inserted if not - // exist. Otherwise, the #runBatch() from task threads will keep the mail exist. - insertMail(false, true); - } - } - - /** - * Insert a mail for a batch of mails. - * - * @param force force check if there should insert a callback regardless of the #hasMail flag. - * @param notify should notify the AEC (typically on the task thread) if new mail inserted. - */ - private void insertMail(boolean force, boolean notify) { - // This method will be invoked via all ForSt I/O threads(from #submit) or task thread (from - // #runBatch()). The #hasMail flag should be protected by synchronized. - synchronized (this) { - if (force || !hasMail) { - if (currentCallbacks.get() > 0) { - hasMail = true; - mailboxExecutor.execute( - this::runBatch, "Batch running callback of state requests"); - if (notify) { - notifyNewMail(); - } - } else { - hasMail = false; - } - } - } - } - - /** - * Run at most a full batch of callbacks. If there has not been a full batch of callbacks, run - * current callbacks in active buffer. - */ - public void runBatch() throws Exception { - ArrayList<ThrowingRunnable<? extends Exception>> batch = callbackQueue.poll(); - if (batch == null) { - synchronized (activeBufferLock) { - if (!activeBuffer.isEmpty()) { - batch = activeBuffer; - activeBuffer = new ArrayList<>(batchSize); - } - } - } - if (batch != null) { - for (ThrowingRunnable<? extends Exception> task : batch) { - task.run(); - } - currentCallbacks.addAndGet(-batch.size()); - } - // If we are on the task thread, there is no need to notify. - insertMail(true, false); - } - - private void notifyNewMail() { - if (newMailNotify != null) { - newMailNotify.run(); - } - } - - public boolean isHasMail() { - return hasMail; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapper.java new file mode 100644 index 00000000000..0408ca02a5a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A {@link org.apache.flink.core.state.StateFutureImpl.CallbackRunner} that gives info of {@link + * #isHasMail()} to the AEC and notifies new mail if needed. + */ +public class CallbackRunnerWrapper { + + private final MailboxExecutor mailboxExecutor; + + /** Counter of current callbacks. */ + private final AtomicInteger currentCallbacks = new AtomicInteger(0); + + /** The logic to notify new mails to AEC. */ + private final Runnable newMailNotify; + + CallbackRunnerWrapper(MailboxExecutor mailboxExecutor, Runnable newMailNotify) { + this.mailboxExecutor = mailboxExecutor; + this.newMailNotify = newMailNotify; + } + + /** + * Submit a callback to run. + * + * @param task the callback. + */ + public void submit(ThrowingRunnable<? extends Exception> task) { + mailboxExecutor.execute( + () -> { + // -1 before the task run, since the task may query #isHasMail, and we don't + // want to return to true before the task is finished if there is no else mail. + currentCallbacks.decrementAndGet(); + task.run(); + }, + "Callback of state request"); + if (currentCallbacks.getAndIncrement() == 0) { + notifyNewMail(); + } + } + + private void notifyNewMail() { + if (newMailNotify != null) { + newMailNotify.run(); + } + } + + public boolean isHasMail() { + return currentCallbacks.get() > 0; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java index 7b874a45ec7..c02bbb2c6bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java @@ -28,12 +28,12 @@ import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandle public class StateFutureFactory<K> { private final AsyncExecutionController<K> asyncExecutionController; - private final BatchCallbackRunner callbackRunner; + private final CallbackRunnerWrapper callbackRunner; private final AsyncFrameworkExceptionHandler exceptionHandler; StateFutureFactory( AsyncExecutionController<K> asyncExecutionController, - BatchCallbackRunner callbackRunner, + CallbackRunnerWrapper callbackRunner, AsyncFrameworkExceptionHandler exceptionHandler) { this.asyncExecutionController = asyncExecutionController; this.callbackRunner = callbackRunner; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapperTest.java similarity index 73% rename from flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapperTest.java index 28ef9cc7f9b..80fd895c3f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/CallbackRunnerWrapperTest.java @@ -30,8 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link BatchCallbackRunner}. */ -public class BatchCallbackRunnerTest { +/** Tests for {@link CallbackRunnerWrapper}. */ +public class CallbackRunnerWrapperTest { private static final ThrowingRunnable<? extends Exception> DUMMY = () -> {}; @@ -39,7 +39,8 @@ public class BatchCallbackRunnerTest { void testSingleSubmit() { ManualMailboxExecutor executor = new ManualMailboxExecutor(); AtomicBoolean notified = new AtomicBoolean(false); - BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> notified.set(true)); + CallbackRunnerWrapper runner = + new CallbackRunnerWrapper(executor, () -> notified.set(true)); runner.submit(DUMMY); assertThat(runner.isHasMail()).isTrue(); assertThat(notified.get()).isTrue(); @@ -47,28 +48,6 @@ public class BatchCallbackRunnerTest { assertThat(runner.isHasMail()).isFalse(); } - @Test - void testHugeBatch() { - ManualMailboxExecutor executor = new ManualMailboxExecutor(); - AtomicBoolean notified = new AtomicBoolean(false); - BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> notified.set(true)); - for (int i = 0; i < BatchCallbackRunner.DEFAULT_BATCH_SIZE + 1; i++) { - runner.submit(DUMMY); - } - assertThat(runner.isHasMail()).isTrue(); - assertThat(notified.get()).isTrue(); - executor.runOne(); - assertThat(runner.isHasMail()).isTrue(); - notified.set(false); - runner.submit(DUMMY); - assertThat(notified.get()).isFalse(); - executor.runOne(); - assertThat(runner.isHasMail()).isFalse(); - runner.submit(DUMMY); - assertThat(runner.isHasMail()).isTrue(); - assertThat(notified.get()).isTrue(); - } - /** A mailbox executor that immediately executes code in the current thread. */ public static class ManualMailboxExecutor implements MailboxExecutor {