This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 17db0fd17b add ScalingThreadPoolExecutor and use for realtime Lucene thread pool (#12274) 17db0fd17b is described below commit 17db0fd17b688fe609b82697a5c3b3117d93cdba Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com> AuthorDate: Wed Jan 31 13:43:01 2024 -0800 add ScalingThreadPoolExecutor and use for realtime Lucene thread pool (#12274) * add ScalingThreadPoolExecutor and use for realtime Lucene thread pool * add license headers --- .../common/utils/ScalingThreadPoolExecutor.java | 128 +++++++++++++++++++++ .../utils/ScalingThreadPoolExecutorTest.java | 69 +++++++++++ .../RealtimeLuceneTextIndexSearcherPool.java | 6 +- 3 files changed, 199 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java new file mode 100644 index 0000000000..989658a692 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nonnull; + + +/** + * ScalingThreadPoolExecutor is an auto-scaling ThreadPoolExecutor. If there is no available thread for a new task, + * a new thread will be created by the internal ThreadPoolExecutor to process the task (up to maximumPoolSize). If + * there is an available thread, no additional thread will be created. + * + * This is done by creating a ScalingQueue that will 'reject' a new task if there are no available threads, forcing + * the pool to create a new thread. The rejection is then handled to queue the task anyway. + * + * This differs from the plain ThreadPoolExecutor implementation which does not create new threads if the queue (not + * thread pool) has capacity. For a more complete explanation, see: + * https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile + */ +public class ScalingThreadPoolExecutor extends ThreadPoolExecutor { + private final AtomicInteger _activeCount = new AtomicInteger(); + + public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + public int getActiveCount() { + return _activeCount.get(); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + _activeCount.incrementAndGet(); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + _activeCount.decrementAndGet(); + } + + /** + * Creates a new ScalingThreadPoolExecutor + * + * @param min minimum pool size + * @param max maximum pool size + * @param keepAliveTime idle thread keepAliveTime (milliseconds) + * @return auto-scaling ExecutorService + */ + public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) { + ScalingQueue<Runnable> queue = new ScalingQueue<>(); + ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue); + executor.setRejectedExecutionHandler(new ForceQueuePolicy()); + queue.setThreadPoolExecutor(executor); + return executor; + } + + /** + * Used to handle queue rejections. The policy ensures we still queue the Runnable, and the rejection ensures the pool + * will be expanded if necessary + */ + static class ForceQueuePolicy implements RejectedExecutionHandler { + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + // should never happen since we never wait + throw new RejectedExecutionException(e); + } + } + } + + /** + * Used to reject new elements if there is no available thread. Rejections will be handled by {@link ForceQueuePolicy} + */ + static class ScalingQueue<E> extends LinkedBlockingQueue<E> { + + private ThreadPoolExecutor _executor; + + // Creates a queue of size Integer.MAX_SIZE + public ScalingQueue() { + super(); + } + + // Sets the executor this queue belongs to + public void setThreadPoolExecutor(ThreadPoolExecutor executor) { + _executor = executor; + } + + /** + * Inserts the specified element at the tail of this queue if there is at least one available thread + * to run the current task. If all pool threads are actively busy, it rejects the offer. + * + * @param e the element to add. + * @return true if it was possible to add the element to this queue, else false + */ + @Override + public boolean offer(@Nonnull E e) { + int allWorkingThreads = _executor.getActiveCount() + super.size(); + return allWorkingThreads < _executor.getPoolSize() && super.offer(e); + } + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java new file mode 100644 index 0000000000..5a1203f1d2 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import java.util.concurrent.ThreadPoolExecutor; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class ScalingThreadPoolExecutorTest { + + @Test + public void testCreateThreadPerRunnable() { + ThreadPoolExecutor executorService = (ThreadPoolExecutor) ScalingThreadPoolExecutor.newScalingThreadPool(0, 5, 500); + assertEquals(executorService.getLargestPoolSize(), 0); + for (int i = 0; i < 5; i++) { + executorService.submit(getSleepingRunnable()); + } + assertTrue(executorService.getLargestPoolSize() >= 2); + } + + @Test + public void testCreateThreadsUpToMax() { + ThreadPoolExecutor executorService = (ThreadPoolExecutor) ScalingThreadPoolExecutor.newScalingThreadPool(0, 5, 500); + for (int i = 0; i < 10; i++) { + executorService.submit(getSleepingRunnable()); + } + assertEquals(executorService.getLargestPoolSize(), 5); + } + + @Test + public void testScaleDownAfterDelay() { + ThreadPoolExecutor executorService = (ThreadPoolExecutor) ScalingThreadPoolExecutor.newScalingThreadPool(0, 5, 500); + for (int i = 0; i < 2; i++) { + executorService.submit(getSleepingRunnable()); + } + TestUtils.waitForCondition(aVoid -> executorService.getPoolSize() == 0, 2000, + "Timed out waiting for thread pool to scale down"); + } + + private Runnable getSleepingRunnable() { + return () -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java index d1372106e0..9eb1126f4a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java @@ -19,9 +19,7 @@ package org.apache.pinot.segment.local.realtime.impl.invertedindex; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.utils.ScalingThreadPoolExecutor; /** @@ -35,7 +33,7 @@ public class RealtimeLuceneTextIndexSearcherPool { private static ExecutorService _executorService; private RealtimeLuceneTextIndexSearcherPool(int size) { - _executorService = new ThreadPoolExecutor(0, size, 500, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500); } public static RealtimeLuceneTextIndexSearcherPool getInstance() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org