This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 39e7a91733 fix: throw reject when SingleThreadExecutor drainTo in
progress and queue is empty (#4488)
39e7a91733 is described below
commit 39e7a9173359bc5a75929651f590f5f40b672eb4
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Apr 28 11:44:29 2025 +0800
fix: throw reject when SingleThreadExecutor drainTo in progress and queue
is empty (#4488)
Fix Issue #4465
Motivaction
In SingleThreadExecutor, the runner drains all tasks from the queue into
localTasks. Although the queue is empty in memory at this point, the tasks are
still pending execution in localTasks—so logically, the queue is still "full."
Calling execute() during this phase should not enqueue a new runnable into
the queue, as doing so would exceed the intended capacity. This can lead to
increased memory usage and potential OutOfMemory (OOM) issues.
Changes
To address this, we introduce a variable to track the total number of
pending runnables. This counter is used to control whether a new task should be
added:
The counter is incremented when a runnable is added to the queue.
The counter is decremented when a runnable is actually executed.
This ensures accurate tracking of pending tasks and prevents overfilling
the logical task queue.
---
.../common/util/SingleThreadExecutor.java | 93 ++++++++++++++++++++--
.../common/util/TestSingleThreadExecutor.java | 71 ++++++++++++++---
2 files changed, 148 insertions(+), 16 deletions(-)
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
index 3c514ebbda..d48d8e3613 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.common.util;
+import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
@@ -29,6 +30,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -54,6 +56,11 @@ public class SingleThreadExecutor extends
AbstractExecutorService implements Exe
private final LongAdder tasksRejected = new LongAdder();
private final LongAdder tasksFailed = new LongAdder();
+ private final int maxQueueCapacity;
+ private static final AtomicIntegerFieldUpdater<SingleThreadExecutor>
pendingTaskCountUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(SingleThreadExecutor.class,
"pendingTaskCount");
+ private volatile int pendingTaskCount = 0;
+
enum State {
Running,
Shutdown,
@@ -80,6 +87,8 @@ public class SingleThreadExecutor extends
AbstractExecutorService implements Exe
} else {
this.queue = new GrowableMpScArrayConsumerBlockingQueue<>();
}
+ this.maxQueueCapacity = maxQueueCapacity;
+
this.runner = tf.newThread(this);
this.state = State.Running;
this.rejectExecution = rejectExecution;
@@ -144,6 +153,8 @@ public class SingleThreadExecutor extends
AbstractExecutorService implements Exe
tasksFailed.increment();
log.error("Error while running task: {}", t.getMessage(), t);
}
+ } finally {
+ decrementPendingTaskCount(1);
}
return true;
@@ -162,7 +173,8 @@ public class SingleThreadExecutor extends
AbstractExecutorService implements Exe
this.state = State.Shutdown;
this.runner.interrupt();
List<Runnable> remainingTasks = new ArrayList<>();
- queue.drainTo(remainingTasks);
+ int n = queue.drainTo(remainingTasks);
+ decrementPendingTaskCount(n);
return remainingTasks;
}
@@ -204,20 +216,45 @@ public class SingleThreadExecutor extends
AbstractExecutorService implements Exe
@Override
public void execute(Runnable r) {
+ executeRunnableOrList(r, null);
+ }
+
+ @VisibleForTesting
+ void executeRunnableOrList(Runnable runnable, List<Runnable> runnableList)
{
if (state != State.Running) {
throw new RejectedExecutionException("Executor is shutting down");
}
+ boolean hasSingle = runnable != null;
+ boolean hasList = runnableList != null && !runnableList.isEmpty();
+
+ if (hasSingle == hasList) {
+ // Both are provided or both are missing
+ throw new IllegalArgumentException("Provide either 'runnable' or a
non-empty 'runnableList', not both.");
+ }
+
try {
if (!rejectExecution) {
- queue.put(r);
- tasksCount.increment();
- } else {
- if (queue.offer(r)) {
+ if (hasSingle) {
+ queue.put(runnable);
tasksCount.increment();
} else {
- tasksRejected.increment();
- throw new ExecutorRejectedException("Executor queue is
full");
+ for (Runnable task : runnableList) {
+ queue.put(task);
+ tasksCount.increment();
+ }
+ }
+ } else {
+ int count = runnable != null ? 1 : runnableList.size();
+ incrementPendingTaskCount(count);
+ boolean success = hasSingle
+ ? queue.offer(runnable)
+ : queue.addAll(runnableList);
+ if (success) {
+ tasksCount.add(count);
+ } else {
+ decrementPendingTaskCount(count);
+ reject();
}
}
} catch (InterruptedException e) {
@@ -225,6 +262,43 @@ public class SingleThreadExecutor extends
AbstractExecutorService implements Exe
}
}
+ private void incrementPendingTaskCount(int count) {
+ if (maxQueueCapacity <= 0) {
+ return; // Unlimited capacity
+ }
+
+ if (count < 0) {
+ throw new IllegalArgumentException("Count must be non-negative");
+ }
+
+ int oldPendingTaskCount =
pendingTaskCountUpdater.getAndAccumulate(this, count,
+ (curr, inc) -> (curr + inc > maxQueueCapacity) ? curr : curr +
inc);
+
+ if (oldPendingTaskCount + count > maxQueueCapacity) {
+ reject();
+ }
+ }
+
+ private void decrementPendingTaskCount(int count) {
+ if (maxQueueCapacity <= 0) {
+ return; // Unlimited capacity
+ }
+
+ if (count < 0) {
+ throw new IllegalArgumentException("Count must be non-negative");
+ }
+
+ int currentPendingCount = pendingTaskCountUpdater.addAndGet(this,
-count);
+ if (log.isDebugEnabled()) {
+ log.debug("Released {} task(s), current pending count: {}", count,
currentPendingCount);
+ }
+ }
+
+ private void reject() {
+ tasksRejected.increment();
+ throw new ExecutorRejectedException("Executor queue is full");
+ }
+
public void registerMetrics(StatsLogger statsLogger) {
// Register gauges
statsLogger.scopeLabel("thread", runner.getName())
@@ -289,6 +363,11 @@ public class SingleThreadExecutor extends
AbstractExecutorService implements Exe
});
}
+ @VisibleForTesting
+ int getPendingTaskCount() {
+ return pendingTaskCountUpdater.get(this);
+ }
+
private static class ExecutorRejectedException extends
RejectedExecutionException {
private ExecutorRejectedException(String msg) {
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
index 671318de6e..ed72704e97 100644
---
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
@@ -20,10 +20,14 @@ package org.apache.bookkeeper.common.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
@@ -83,20 +87,23 @@ public class TestSingleThreadExecutor {
CountDownLatch startedLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
+ int n = i;
ste.execute(() -> {
- startedLatch.countDown();
-
- try {
- barrier.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- // ignore
+ if (n == 0) {
+ startedLatch.countDown();
+ } else {
+ try {
+ barrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ // ignore
+ }
}
});
-
- // Wait until the first task is already running in the thread
- startedLatch.await();
}
+ // Wait until the first task is already running in the thread
+ startedLatch.await();
+
// Next task should go through, because the runner thread has already
pulled out 1 item from the
// queue: the first tasks which is currently stuck
ste.execute(() -> {
@@ -116,6 +123,52 @@ public class TestSingleThreadExecutor {
assertEquals(0, ste.getFailedTasksCount());
}
+ @Test
+ public void testRejectWhenDrainToInProgressAndQueueIsEmpty() throws
Exception {
+ @Cleanup("shutdownNow")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY,
10, true);
+
+ CountDownLatch waitedLatch = new CountDownLatch(1);
+ List<Runnable> tasks = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ tasks.add(() -> {
+ try {
+ // Block task to simulate an active, long-running task.
+ waitedLatch.await();
+ } catch (Exception e) {
+ // ignored
+ }
+ });
+ }
+ ste.executeRunnableOrList(null, tasks);
+
+ Awaitility.await().pollDelay(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(10,
ste.getPendingTaskCount()));
+
+ // Now the queue is really full and should reject tasks.
+ assertThrows(RejectedExecutionException.class, () -> ste.execute(() ->
{
+ }));
+
+ assertEquals(10, ste.getPendingTaskCount());
+ assertEquals(1, ste.getRejectedTasksCount());
+ assertEquals(0, ste.getFailedTasksCount());
+
+ // Now we can unblock the waited tasks.
+ waitedLatch.countDown();
+
+ // Check the tasks are completed.
+ Awaitility.await().pollDelay(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(0,
ste.getPendingTaskCount()));
+
+ // Invalid cases - should throw IllegalArgumentException.
+ assertThrows(IllegalArgumentException.class, () ->
ste.executeRunnableOrList(null, null));
+ assertThrows(IllegalArgumentException.class, () ->
ste.executeRunnableOrList(null, Collections.emptyList()));
+ assertThrows(IllegalArgumentException.class, () ->
ste.executeRunnableOrList(() -> {
+ }, Lists.newArrayList(() -> {
+ })));
+ }
+
@Test
public void testBlockWhenQueueIsFull() throws Exception {
@Cleanup("shutdown")