This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 17db0fd17b add ScalingThreadPoolExecutor and use for realtime Lucene 
thread pool (#12274)
17db0fd17b is described below

commit 17db0fd17b688fe609b82697a5c3b3117d93cdba
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Wed Jan 31 13:43:01 2024 -0800

    add ScalingThreadPoolExecutor and use for realtime Lucene thread pool 
(#12274)
    
    * add ScalingThreadPoolExecutor and use for realtime Lucene thread pool
    
    * add license headers
---
 .../common/utils/ScalingThreadPoolExecutor.java    | 128 +++++++++++++++++++++
 .../utils/ScalingThreadPoolExecutorTest.java       |  69 +++++++++++
 .../RealtimeLuceneTextIndexSearcherPool.java       |   6 +-
 3 files changed, 199 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
new file mode 100644
index 0000000000..989658a692
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutor.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
+
+
+/**
+ * ScalingThreadPoolExecutor is an auto-scaling ThreadPoolExecutor. If there 
is no available thread for a new task,
+ * a new thread will be created by the internal ThreadPoolExecutor to process 
the task (up to maximumPoolSize). If
+ * there is an available thread, no additional thread will be created.
+ *
+ * This is done by creating a ScalingQueue that will 'reject' a new task if 
there are no available threads, forcing
+ * the pool to create a new thread. The rejection is then handled to queue the 
task anyway.
+ *
+ * This differs from the plain ThreadPoolExecutor implementation which does 
not create new threads if the queue (not
+ * thread pool) has capacity. For a more complete explanation, see:
+ * 
https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile
+ */
+public class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
+  private final AtomicInteger _activeCount = new AtomicInteger();
+
+  public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
keepAliveTime, TimeUnit unit,
+      BlockingQueue<Runnable> workQueue) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+  }
+
+  @Override
+  public int getActiveCount() {
+    return _activeCount.get();
+  }
+
+  @Override
+  protected void beforeExecute(Thread t, Runnable r) {
+    _activeCount.incrementAndGet();
+  }
+
+  @Override
+  protected void afterExecute(Runnable r, Throwable t) {
+    _activeCount.decrementAndGet();
+  }
+
+  /**
+   * Creates a new ScalingThreadPoolExecutor
+   *
+   * @param min minimum pool size
+   * @param max maximum pool size
+   * @param keepAliveTime idle thread keepAliveTime (milliseconds)
+   * @return auto-scaling ExecutorService
+   */
+  public static ExecutorService newScalingThreadPool(int min, int max, long 
keepAliveTime) {
+    ScalingQueue<Runnable> queue = new ScalingQueue<>();
+    ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(min, max, 
keepAliveTime, TimeUnit.MILLISECONDS, queue);
+    executor.setRejectedExecutionHandler(new ForceQueuePolicy());
+    queue.setThreadPoolExecutor(executor);
+    return executor;
+  }
+
+  /**
+   * Used to handle queue rejections. The policy ensures we still queue the 
Runnable, and the rejection ensures the pool
+   * will be expanded if necessary
+   */
+  static class ForceQueuePolicy implements RejectedExecutionHandler {
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      try {
+        executor.getQueue().put(r);
+      } catch (InterruptedException e) {
+        // should never happen since we never wait
+        throw new RejectedExecutionException(e);
+      }
+    }
+  }
+
+  /**
+   * Used to reject new elements if there is no available thread. Rejections 
will be handled by {@link ForceQueuePolicy}
+   */
+  static class ScalingQueue<E> extends LinkedBlockingQueue<E> {
+
+    private ThreadPoolExecutor _executor;
+
+    // Creates a queue of size Integer.MAX_SIZE
+    public ScalingQueue() {
+      super();
+    }
+
+    // Sets the executor this queue belongs to
+    public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
+      _executor = executor;
+    }
+
+    /**
+     * Inserts the specified element at the tail of this queue if there is at 
least one available thread
+     * to run the current task. If all pool threads are actively busy, it 
rejects the offer.
+     *
+     * @param e the element to add.
+     * @return true if it was possible to add the element to this queue, else 
false
+     */
+    @Override
+    public boolean offer(@Nonnull E e) {
+      int allWorkingThreads = _executor.getActiveCount() + super.size();
+      return allWorkingThreads < _executor.getPoolSize() && super.offer(e);
+    }
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
new file mode 100644
index 0000000000..5a1203f1d2
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/ScalingThreadPoolExecutorTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class ScalingThreadPoolExecutorTest {
+
+  @Test
+  public void testCreateThreadPerRunnable() {
+    ThreadPoolExecutor executorService = (ThreadPoolExecutor) 
ScalingThreadPoolExecutor.newScalingThreadPool(0, 5, 500);
+    assertEquals(executorService.getLargestPoolSize(), 0);
+    for (int i = 0; i < 5; i++) {
+      executorService.submit(getSleepingRunnable());
+    }
+    assertTrue(executorService.getLargestPoolSize() >= 2);
+  }
+
+  @Test
+  public void testCreateThreadsUpToMax() {
+    ThreadPoolExecutor executorService = (ThreadPoolExecutor) 
ScalingThreadPoolExecutor.newScalingThreadPool(0, 5, 500);
+    for (int i = 0; i < 10; i++) {
+      executorService.submit(getSleepingRunnable());
+    }
+    assertEquals(executorService.getLargestPoolSize(), 5);
+  }
+
+  @Test
+  public void testScaleDownAfterDelay() {
+    ThreadPoolExecutor executorService = (ThreadPoolExecutor) 
ScalingThreadPoolExecutor.newScalingThreadPool(0, 5, 500);
+    for (int i = 0; i < 2; i++) {
+      executorService.submit(getSleepingRunnable());
+    }
+    TestUtils.waitForCondition(aVoid -> executorService.getPoolSize() == 0, 
2000,
+        "Timed out waiting for thread pool to scale down");
+  }
+
+  private Runnable getSleepingRunnable() {
+    return () -> {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
index d1372106e0..9eb1126f4a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
@@ -19,9 +19,7 @@
 package org.apache.pinot.segment.local.realtime.impl.invertedindex;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.ScalingThreadPoolExecutor;
 
 
 /**
@@ -35,7 +33,7 @@ public class RealtimeLuceneTextIndexSearcherPool {
   private static ExecutorService _executorService;
 
   private RealtimeLuceneTextIndexSearcherPool(int size) {
-    _executorService = new ThreadPoolExecutor(0, size, 500, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+    _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 
500);
   }
 
   public static RealtimeLuceneTextIndexSearcherPool getInstance() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to