This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0fc838fadb MSE throttling: Set a hard limit on number of MSE threads
(#15143)
0fc838fadb is described below
commit 0fc838fadbca80c6484ed7170470c72f91814d9e
Author: Alberto Bastos <[email protected]>
AuthorDate: Mon Mar 3 15:29:48 2025 +0100
MSE throttling: Set a hard limit on number of MSE threads (#15143)
---
.../apache/pinot/query/runtime/QueryRunner.java | 12 ++-
.../pinot/spi/executor/HardLimitExecutor.java | 100 +++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
.../pinot/spi/executor/HardLimitExecutorTest.java | 62 +++++++++++++
4 files changed, 174 insertions(+), 3 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6fc87f0ed9..feefed0f1b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -73,6 +73,7 @@ import
org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
+import org.apache.pinot.spi.executor.HardLimitExecutor;
import org.apache.pinot.spi.trace.LoggerConstants;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
@@ -174,9 +175,14 @@ public class QueryRunner {
String joinOverflowModeStr =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
_joinOverflowMode = joinOverflowModeStr != null ?
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
- _executorService =
- ExecutorServiceUtils.create(config,
Server.MULTISTAGE_EXECUTOR_CONFIG_PREFIX, "query-runner-on-" + port,
- Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE);
+ _executorService = ExecutorServiceUtils.create(config,
Server.MULTISTAGE_EXECUTOR_CONFIG_PREFIX,
+ "query-runner-on-" + port, Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE);
+
+ int hardLimit = HardLimitExecutor.getMultiStageExecutorHardLimit(config);
+ if (hardLimit > 0) {
+ _executorService = new HardLimitExecutor(hardLimit, _executorService);
+ }
+
_opChainScheduler = new OpChainSchedulerService(_executorService);
_mailboxService = new MailboxService(hostname, port, config, tlsConfig);
try {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
new file mode 100644
index 0000000000..3a83fbb5b3
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * An Executor that allows a maximum of tasks running at the same time,
rejecting immediately any excess.
+ */
+public class HardLimitExecutor extends DecoratorExecutorService {
+
+ private final AtomicInteger _running;
+ private final int _max;
+
+ public HardLimitExecutor(int max, ExecutorService executorService) {
+ super(executorService);
+ _running = new AtomicInteger(0);
+ _max = max;
+ }
+
+ /**
+ * Returns the hard limit of the number of threads that can be used by the
multi-stage executor.
+ * @param config Pinot configuration
+ * @return hard limit of the number of threads that can be used by the
multi-stage executor (no hard limit if <= 0)
+ */
+ public static int getMultiStageExecutorHardLimit(PinotConfiguration config) {
+ try {
+ int maxThreads = Integer.parseInt(config.getProperty(
+
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
+
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
+ ));
+ int hardLimitFactor = Integer.parseInt(config.getProperty(
+
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR,
+
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR
+ ));
+ if (maxThreads <= 0 || hardLimitFactor <= 0) {
+ return 0;
+ }
+ return maxThreads * hardLimitFactor;
+ } catch (NumberFormatException e) {
+ return
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)
+ *
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR);
+ }
+ }
+
+ protected void checkTaskAllowed() {
+ if (_running.get() >= _max) {
+ throw new IllegalStateException("Tasks limit exceeded.");
+ }
+ }
+
+ @Override
+ protected <T> Callable<T> decorate(Callable<T> task) {
+ checkTaskAllowed();
+ return () -> {
+ checkTaskAllowed();
+ _running.getAndIncrement();
+ try {
+ return task.call();
+ } finally {
+ _running.decrementAndGet();
+ }
+ };
+ }
+
+ @Override
+ protected Runnable decorate(Runnable task) {
+ checkTaskAllowed();
+ return () -> {
+ checkTaskAllowed();
+ _running.getAndIncrement();
+ try {
+ task.run();
+ } finally {
+ _running.decrementAndGet();
+ }
+ };
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 9ba8b4f85f..b0db23e488 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -248,6 +248,9 @@ public class CommonConstants {
public static final String
CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS =
"pinot.beta.multistage.engine.max.server.query.threads";
public static final String
DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS = "-1";
+ public static final String
CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR =
+
"pinot.beta.multistage.engine.max.server.query.threads.hardlimit.factor";
+ public static final String
DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR = "4";
// Preprocess throttle configs
public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM =
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
new file mode 100644
index 0000000000..4dc71ee092
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.fail;
+
+
+public class HardLimitExecutorTest {
+
+ @Test
+ public void testHardLimit()
+ throws Exception {
+ HardLimitExecutor ex = new HardLimitExecutor(1,
Executors.newCachedThreadPool());
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ try {
+ ex.execute(() -> {
+ try {
+ barrier.await();
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException | BrokenBarrierException e) {
+ // do nothing
+ }
+ });
+
+ barrier.await();
+ try {
+ ex.execute(() -> {
+ // do nothing
+ });
+ fail("Should not allow more than 1 task");
+ } catch (Exception e) {
+ // as expected
+ assertEquals("Tasks limit exceeded.", e.getMessage());
+ }
+ } finally {
+ ex.shutdownNow();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]