This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 39e7a91733 fix: throw reject when SingleThreadExecutor drainTo in progress and queue is empty (#4488) 39e7a91733 is described below commit 39e7a9173359bc5a75929651f590f5f40b672eb4 Author: Zixuan Liu <node...@gmail.com> AuthorDate: Mon Apr 28 11:44:29 2025 +0800 fix: throw reject when SingleThreadExecutor drainTo in progress and queue is empty (#4488) Fix Issue #4465 Motivaction In SingleThreadExecutor, the runner drains all tasks from the queue into localTasks. Although the queue is empty in memory at this point, the tasks are still pending execution in localTasks—so logically, the queue is still "full." Calling execute() during this phase should not enqueue a new runnable into the queue, as doing so would exceed the intended capacity. This can lead to increased memory usage and potential OutOfMemory (OOM) issues. Changes To address this, we introduce a variable to track the total number of pending runnables. This counter is used to control whether a new task should be added: The counter is incremented when a runnable is added to the queue. The counter is decremented when a runnable is actually executed. This ensures accurate tracking of pending tasks and prevents overfilling the logical task queue. --- .../common/util/SingleThreadExecutor.java | 93 ++++++++++++++++++++-- .../common/util/TestSingleThreadExecutor.java | 71 ++++++++++++++--- 2 files changed, 148 insertions(+), 16 deletions(-) diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java index 3c514ebbda..d48d8e3613 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java @@ -18,6 +18,7 @@ package org.apache.bookkeeper.common.util; +import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.List; @@ -29,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -54,6 +56,11 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe private final LongAdder tasksRejected = new LongAdder(); private final LongAdder tasksFailed = new LongAdder(); + private final int maxQueueCapacity; + private static final AtomicIntegerFieldUpdater<SingleThreadExecutor> pendingTaskCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(SingleThreadExecutor.class, "pendingTaskCount"); + private volatile int pendingTaskCount = 0; + enum State { Running, Shutdown, @@ -80,6 +87,8 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe } else { this.queue = new GrowableMpScArrayConsumerBlockingQueue<>(); } + this.maxQueueCapacity = maxQueueCapacity; + this.runner = tf.newThread(this); this.state = State.Running; this.rejectExecution = rejectExecution; @@ -144,6 +153,8 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe tasksFailed.increment(); log.error("Error while running task: {}", t.getMessage(), t); } + } finally { + decrementPendingTaskCount(1); } return true; @@ -162,7 +173,8 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe this.state = State.Shutdown; this.runner.interrupt(); List<Runnable> remainingTasks = new ArrayList<>(); - queue.drainTo(remainingTasks); + int n = queue.drainTo(remainingTasks); + decrementPendingTaskCount(n); return remainingTasks; } @@ -204,20 +216,45 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe @Override public void execute(Runnable r) { + executeRunnableOrList(r, null); + } + + @VisibleForTesting + void executeRunnableOrList(Runnable runnable, List<Runnable> runnableList) { if (state != State.Running) { throw new RejectedExecutionException("Executor is shutting down"); } + boolean hasSingle = runnable != null; + boolean hasList = runnableList != null && !runnableList.isEmpty(); + + if (hasSingle == hasList) { + // Both are provided or both are missing + throw new IllegalArgumentException("Provide either 'runnable' or a non-empty 'runnableList', not both."); + } + try { if (!rejectExecution) { - queue.put(r); - tasksCount.increment(); - } else { - if (queue.offer(r)) { + if (hasSingle) { + queue.put(runnable); tasksCount.increment(); } else { - tasksRejected.increment(); - throw new ExecutorRejectedException("Executor queue is full"); + for (Runnable task : runnableList) { + queue.put(task); + tasksCount.increment(); + } + } + } else { + int count = runnable != null ? 1 : runnableList.size(); + incrementPendingTaskCount(count); + boolean success = hasSingle + ? queue.offer(runnable) + : queue.addAll(runnableList); + if (success) { + tasksCount.add(count); + } else { + decrementPendingTaskCount(count); + reject(); } } } catch (InterruptedException e) { @@ -225,6 +262,43 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe } } + private void incrementPendingTaskCount(int count) { + if (maxQueueCapacity <= 0) { + return; // Unlimited capacity + } + + if (count < 0) { + throw new IllegalArgumentException("Count must be non-negative"); + } + + int oldPendingTaskCount = pendingTaskCountUpdater.getAndAccumulate(this, count, + (curr, inc) -> (curr + inc > maxQueueCapacity) ? curr : curr + inc); + + if (oldPendingTaskCount + count > maxQueueCapacity) { + reject(); + } + } + + private void decrementPendingTaskCount(int count) { + if (maxQueueCapacity <= 0) { + return; // Unlimited capacity + } + + if (count < 0) { + throw new IllegalArgumentException("Count must be non-negative"); + } + + int currentPendingCount = pendingTaskCountUpdater.addAndGet(this, -count); + if (log.isDebugEnabled()) { + log.debug("Released {} task(s), current pending count: {}", count, currentPendingCount); + } + } + + private void reject() { + tasksRejected.increment(); + throw new ExecutorRejectedException("Executor queue is full"); + } + public void registerMetrics(StatsLogger statsLogger) { // Register gauges statsLogger.scopeLabel("thread", runner.getName()) @@ -289,6 +363,11 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe }); } + @VisibleForTesting + int getPendingTaskCount() { + return pendingTaskCountUpdater.get(this); + } + private static class ExecutorRejectedException extends RejectedExecutionException { private ExecutorRejectedException(String msg) { diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java index 671318de6e..ed72704e97 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java @@ -20,10 +20,14 @@ package org.apache.bookkeeper.common.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.Lists; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; @@ -83,20 +87,23 @@ public class TestSingleThreadExecutor { CountDownLatch startedLatch = new CountDownLatch(1); for (int i = 0; i < 10; i++) { + int n = i; ste.execute(() -> { - startedLatch.countDown(); - - try { - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - // ignore + if (n == 0) { + startedLatch.countDown(); + } else { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // ignore + } } }); - - // Wait until the first task is already running in the thread - startedLatch.await(); } + // Wait until the first task is already running in the thread + startedLatch.await(); + // Next task should go through, because the runner thread has already pulled out 1 item from the // queue: the first tasks which is currently stuck ste.execute(() -> { @@ -116,6 +123,52 @@ public class TestSingleThreadExecutor { assertEquals(0, ste.getFailedTasksCount()); } + @Test + public void testRejectWhenDrainToInProgressAndQueueIsEmpty() throws Exception { + @Cleanup("shutdownNow") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, true); + + CountDownLatch waitedLatch = new CountDownLatch(1); + List<Runnable> tasks = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + tasks.add(() -> { + try { + // Block task to simulate an active, long-running task. + waitedLatch.await(); + } catch (Exception e) { + // ignored + } + }); + } + ste.executeRunnableOrList(null, tasks); + + Awaitility.await().pollDelay(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(10, ste.getPendingTaskCount())); + + // Now the queue is really full and should reject tasks. + assertThrows(RejectedExecutionException.class, () -> ste.execute(() -> { + })); + + assertEquals(10, ste.getPendingTaskCount()); + assertEquals(1, ste.getRejectedTasksCount()); + assertEquals(0, ste.getFailedTasksCount()); + + // Now we can unblock the waited tasks. + waitedLatch.countDown(); + + // Check the tasks are completed. + Awaitility.await().pollDelay(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(0, ste.getPendingTaskCount())); + + // Invalid cases - should throw IllegalArgumentException. + assertThrows(IllegalArgumentException.class, () -> ste.executeRunnableOrList(null, null)); + assertThrows(IllegalArgumentException.class, () -> ste.executeRunnableOrList(null, Collections.emptyList())); + assertThrows(IllegalArgumentException.class, () -> ste.executeRunnableOrList(() -> { + }, Lists.newArrayList(() -> { + }))); + } + @Test public void testBlockWhenQueueIsFull() throws Exception { @Cleanup("shutdown")