This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 36ce140f18 fix race condition in `ScalingThreadPoolExecutor` (#13360)
36ce140f18 is described below
commit 36ce140f18429f5f0505704b9a25e022d1042c9e
Author: Christopher Peck <[email protected]>
AuthorDate: Tue Jun 11 16:44:41 2024 -0700
fix race condition in `ScalingThreadPoolExecutor` (#13360)
---
.../common/utils/ScalingThreadPoolExecutor.java | 45 +++++++++++++++-------
.../utils/ScalingThreadPoolExecutorTest.java | 15 ++++++++
2 files changed, 46 insertions(+), 14 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
index 989658a692..ea4f00a8e8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
@@ -26,19 +26,20 @@ import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
/**
* ScalingThreadPoolExecutor is an auto-scaling ThreadPoolExecutor. If there
is no available thread for a new task,
* a new thread will be created by the internal ThreadPoolExecutor to process
the task (up to maximumPoolSize). If
* there is an available thread, no additional thread will be created.
- *
+ * <p>
* This is done by creating a ScalingQueue that will 'reject' a new task if
there are no available threads, forcing
* the pool to create a new thread. The rejection is then handled to queue the
task anyway.
- *
+ * <p>
* This differs from the plain ThreadPoolExecutor implementation which does
not create new threads if the queue (not
- * thread pool) has capacity. For a more complete explanation, see:
+ * thread pool) has capacity. For a more complete explanation, see (note: the
original version includes a race
+ * condition, and the implementation here differs slightly):
*
https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile
*/
public class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
@@ -76,7 +77,6 @@ public class ScalingThreadPoolExecutor extends
ThreadPoolExecutor {
ScalingQueue<Runnable> queue = new ScalingQueue<>();
ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max,
keepAliveTime, TimeUnit.MILLISECONDS, queue);
executor.setRejectedExecutionHandler(new ForceQueuePolicy());
- queue.setThreadPoolExecutor(executor);
return executor;
}
@@ -100,29 +100,46 @@ public class ScalingThreadPoolExecutor extends
ThreadPoolExecutor {
*/
static class ScalingQueue<E> extends LinkedBlockingQueue<E> {
- private ThreadPoolExecutor _executor;
+ AtomicInteger _currentIdleThreadCount = new AtomicInteger(0);
// Creates a queue of size Integer.MAX_SIZE
public ScalingQueue() {
super();
}
- // Sets the executor this queue belongs to
- public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
- _executor = executor;
+ @Override
+ public E take()
+ throws InterruptedException {
+ _currentIdleThreadCount.incrementAndGet();
+ try {
+ return super.take();
+ } finally {
+ _currentIdleThreadCount.decrementAndGet();
+ }
+ }
+
+ @Override
+ @Nullable
+ public E poll(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ _currentIdleThreadCount.incrementAndGet();
+ try {
+ return super.poll(timeout, unit);
+ } finally {
+ _currentIdleThreadCount.decrementAndGet();
+ }
}
/**
- * Inserts the specified element at the tail of this queue if there is at
least one available thread
- * to run the current task. If all pool threads are actively busy, it
rejects the offer.
+ * Inserts the specified element at the tail of this queue if there is at
least one idle thread
+ * to run the current task. If all pool threads are actively busy, the
offer is rejected.
*
* @param e the element to add.
* @return true if it was possible to add the element to this queue, else
false
*/
@Override
- public boolean offer(@Nonnull E e) {
- int allWorkingThreads = _executor.getActiveCount() + super.size();
- return allWorkingThreads < _executor.getPoolSize() && super.offer(e);
+ public boolean offer(E e) {
+ return _currentIdleThreadCount.get() > 0 && super.offer(e);
}
}
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
index 5a1203f1d2..7f0e892a46 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
@@ -57,6 +57,21 @@ public class ScalingThreadPoolExecutorTest {
"Timed out waiting for thread pool to scale down");
}
+ @Test
+ public void testRapidSubmission() {
+ ThreadPoolExecutor executorService = (ThreadPoolExecutor)
ScalingThreadPoolExecutor.newScalingThreadPool(0, 4, 0L);
+ Runnable r1 = getSleepingRunnable();
+ Runnable r2 = getSleepingRunnable();
+
+ // When Runnables are submitted rapidly, the pool should scale up to 2
threads. The previous test cases can fail
+ // to catch such a race condition because Runnables are initialized as
they are submitted, which introduced enough
+ // delay to avoid the condition
+ executorService.submit(r1);
+ executorService.submit(r2);
+ TestUtils.waitForCondition(aVoid -> executorService.getPoolSize() == 2,
2000,
+ "Timed out waiting for thread pool to scale up");
+ }
+
private Runnable getSleepingRunnable() {
return () -> {
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]