This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-3682 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55394db6eda09c00dee6613257cfb9966861b701 Author: JackieTien97 <jackietie...@gmail.com> AuthorDate: Tue Jun 28 16:12:28 2022 +0800 [IOTDB-3682] Add DriverScheduler configuration into iotdb-datanode.properties --- server/src/assembly/resources/conf/iotdb-datanode.properties | 4 ++++ .../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++++++++ .../main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 ++++++++++ .../iotdb/db/mpp/execution/schedule/DriverScheduler.java | 12 ++++++++---- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties index c917204313..9cb1d4cfaf 100644 --- a/server/src/assembly/resources/conf/iotdb-datanode.properties +++ b/server/src/assembly/resources/conf/iotdb-datanode.properties @@ -598,6 +598,10 @@ timestamp_precision=ms # Datatype: int # query_timeout_threshold=60000 +# The maximum allowed concurrently executing queries +# Datatype: int +# max_allowed_concurrent_queries=1000 + # The number of sub compaction threads to be set up to perform compaction. # Currently only works for nonAligned data in cross space compaction and unseq inner space compaction. # Set to 1 when less than or equal to 0. diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 0ceae86ee6..8c06224529 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -296,6 +296,9 @@ public class IoTDBConfig { /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */ private int concurrentQueryThread = 16; + /** How many queries can be concurrently executed. When <= 0, use 1000. */ + private int maxAllowedConcurrentQueries = 1000; + /** * How many threads can concurrently read data for raw data query. When <= 0, use CPU core number. */ @@ -1324,6 +1327,14 @@ public class IoTDBConfig { this.concurrentQueryThread = concurrentQueryThread; } + public int getMaxAllowedConcurrentQueries() { + return maxAllowedConcurrentQueries; + } + + public void setMaxAllowedConcurrentQueries(int maxAllowedConcurrentQueries) { + this.maxAllowedConcurrentQueries = maxAllowedConcurrentQueries; + } + public int getConcurrentSubRawQueryThread() { return concurrentSubRawQueryThread; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 66ed7c4e9d..b50aead65c 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -476,6 +476,16 @@ public class IoTDBDescriptor { conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors()); } + conf.setMaxAllowedConcurrentQueries( + Integer.parseInt( + properties.getProperty( + "max_allowed_concurrent_queries", + Integer.toString(conf.getConcurrentQueryThread())))); + + if (conf.getMaxAllowedConcurrentQueries() <= 0) { + conf.setMaxAllowedConcurrentQueries(1000); + } + conf.setConcurrentSubRawQueryThread( Integer.parseInt( properties.getProperty( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index a4bea2c91f..ce079f4f3c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.driver.IDriver; @@ -63,11 +64,14 @@ public class DriverScheduler implements IDriverScheduler, IService { private final Set<DriverTask> blockedTasks; private final Map<QueryId, Set<DriverTask>> queryMap; private final ITaskScheduler scheduler; - private IMPPDataExchangeManager blockManager; // TODO: init with real IMPPDataExchangeManager + private IMPPDataExchangeManager blockManager; - private static final int MAX_CAPACITY = 1000; // TODO: load from config files - private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files - private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests + private static final int MAX_CAPACITY = + IoTDBDescriptor.getInstance().getConfig().getMaxAllowedConcurrentQueries(); + private static final int WORKER_THREAD_NUM = + IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(); + private static final int QUERY_TIMEOUT_MS = + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); private final ThreadGroup workerGroups; private final List<AbstractDriverThread> threads;