This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch worktree-rpc-thread-opt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9acb1ea1caaca9831a034c4cc507d15b6f511ae4 Author: JackieTien97 <[email protected]> AuthorDate: Mon Apr 27 09:33:19 2026 +0800 Clean up dead RPC thread config and fix ClientPoolProperty default 1. Fix dn_selector_thread_count_of_client_manager naming mismatch: rename to dn_selector_thread_nums_of_client_manager to match CommonDescriptor 2. Remove unused dn_rpc_selector_thread_count config and its backing field rpcSelectorThreadCount in IoTDBConfig 3. Remove unused coordinator_write_executor_size config and the dead writeOperationExecutor in Coordinator/TreeModelPlanner 4. Fix ClientPoolProperty.Builder to read maxClientNumForEachNode from CommonConfig instead of hardcoded constant --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ---------------------- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 20 +------------------- .../iotdb/db/queryengine/plan/Coordinator.java | 9 --------- .../queryengine/plan/planner/TreeModelPlanner.java | 3 --- .../conf/iotdb-system.properties.template | 7 +------ .../client/property/ClientPoolProperty.java | 5 ++++- .../iotdb/commons/concurrent/ThreadName.java | 2 -- 7 files changed, 6 insertions(+), 62 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c7fc72152e2..e1b64d9d1ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -137,9 +137,6 @@ public class IoTDBConfig { /** Port which the JDBC server listens to. */ private int rpcPort = 6667; - /** Rpc Selector thread num */ - private int rpcSelectorThreadCount = 1; - /** Max concurrent client number */ private int rpcMaxConcurrentClientNum = 1000; @@ -989,9 +986,6 @@ public class IoTDBConfig { /** ThreadPool size for read operation in coordinator */ private int coordinatorReadExecutorSize = 20; - /** ThreadPool size for write operation in coordinator */ - private int coordinatorWriteExecutorSize = 50; - /** Policy of DataNodeSchemaCache eviction */ private String dataNodeSchemaCacheEvictionPolicy = "FIFO"; @@ -1823,14 +1817,6 @@ public class IoTDBConfig { this.unSeqTsFileSize = unSeqTsFileSize; } - public int getRpcSelectorThreadCount() { - return rpcSelectorThreadCount; - } - - public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) { - this.rpcSelectorThreadCount = rpcSelectorThreadCount; - } - public int getRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } @@ -3349,14 +3335,6 @@ public class IoTDBConfig { this.coordinatorReadExecutorSize = coordinatorReadExecutorSize; } - public int getCoordinatorWriteExecutorSize() { - return coordinatorWriteExecutorSize; - } - - public void setCoordinatorWriteExecutorSize(int coordinatorWriteExecutorSize) { - this.coordinatorWriteExecutorSize = coordinatorWriteExecutorSize; - } - public TEndPoint getAddressAndPort() { return new TEndPoint(rpcAddress, rpcPort); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 065583a383c..c92c136d319 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -294,7 +294,7 @@ public class IoTDBDescriptor { conf.setSelectorNumOfClientManager( Integer.parseInt( properties.getProperty( - "dn_selector_thread_count_of_client_manager", + "dn_selector_thread_nums_of_client_manager", String.valueOf(conf.getSelectorNumOfClientManager())))); conf.setRpcPort( @@ -724,18 +724,6 @@ public class IoTDBDescriptor { properties.getProperty( "0.13_data_insert_adapt", String.valueOf(conf.isEnable13DataInsertAdapt())))); - int rpcSelectorThreadNum = - Integer.parseInt( - properties.getProperty( - "dn_rpc_selector_thread_count", - Integer.toString(conf.getRpcSelectorThreadCount()))); - - if (rpcSelectorThreadNum <= 0) { - rpcSelectorThreadNum = 1; - } - - conf.setRpcSelectorThreadCount(rpcSelectorThreadNum); - int maxConcurrentClientNum = Integer.parseInt( properties.getProperty( @@ -980,12 +968,6 @@ public class IoTDBDescriptor { properties.getProperty( "coordinator_read_executor_size", Integer.toString(conf.getCoordinatorReadExecutorSize())))); - conf.setCoordinatorWriteExecutorSize( - Integer.parseInt( - properties.getProperty( - "coordinator_write_executor_size", - Integer.toString(conf.getCoordinatorWriteExecutorSize())))); - conf.setDataNodeTableSchemaCacheSize( Long.parseLong( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 9cee0e71a97..6e04e5918d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -219,7 +219,6 @@ public class Coordinator { new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory()); private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; private final ExecutorService dispatchExecutor; @@ -276,7 +275,6 @@ public class Coordinator { this.queryExecutionMap = new ConcurrentHashMap<>(); this.typeManager = new InternalTypeManager(); this.executor = getQueryExecutor(); - this.writeOperationExecutor = getWriteExecutor(); this.scheduledExecutor = getScheduledExecutor(); int dispatchThreadNum = Math.max(20, Runtime.getRuntime().availableProcessors() * 2); this.dispatchExecutor = @@ -410,7 +408,6 @@ public class Coordinator { new TreeModelPlanner( statement, executor, - writeOperationExecutor, scheduledExecutor, partitionFetcher, schemaFetcher, @@ -791,12 +788,6 @@ public class Coordinator { coordinatorReadExecutorSize, ThreadName.MPP_COORDINATOR_EXECUTOR_POOL.getName()); } - private ExecutorService getWriteExecutor() { - int coordinatorWriteExecutorSize = CONFIG.getCoordinatorWriteExecutorSize(); - return IoTDBThreadPoolFactory.newFixedThreadPool( - coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName()); - } - private ScheduledExecutorService getScheduledExecutor() { return IoTDBThreadPoolFactory.newScheduledThreadPool( COORDINATOR_SCHEDULED_EXECUTOR_SIZE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index f2d901c1540..756f17148ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -59,7 +59,6 @@ public class TreeModelPlanner implements IPlanner { private final Statement statement; private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; private final IPartitionFetcher partitionFetcher; @@ -75,7 +74,6 @@ public class TreeModelPlanner implements IPlanner { public TreeModelPlanner( Statement statement, ExecutorService executor, - ExecutorService writeOperationExecutor, ScheduledExecutorService scheduledExecutor, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, @@ -84,7 +82,6 @@ public class TreeModelPlanner implements IPlanner { asyncInternalServiceClientManager) { this.statement = statement; this.executor = executor; - this.writeOperationExecutor = writeOperationExecutor; this.scheduledExecutor = scheduledExecutor; this.partitionFetcher = partitionFetcher; this.schemaFetcher = schemaFetcher; diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 378a6226cbf..1f6c388c4c7 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -523,11 +523,6 @@ dn_rpc_thrift_compression_enable=false # this feature is under development, set this as false before it is done. dn_rpc_advanced_compression_enable=false -# the number of rpc selector -# effectiveMode: restart -# Datatype: int -dn_rpc_selector_thread_count=1 - # The maximum number of concurrent clients that can be connected to the dataNode. # effectiveMode: restart # Datatype: int @@ -551,7 +546,7 @@ dn_connection_timeout_ms=60000 # selector thread (TAsyncClientManager) nums for async thread in a clientManager # effectiveMode: restart # Datatype: int -dn_selector_thread_count_of_client_manager=1 +dn_selector_thread_nums_of_client_manager=1 # The maximum number of clients that can be allocated for a node in a clientManager. # When the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java index 5bb7c22ee49..b309985b1d7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.client.property; +import org.apache.iotdb.commons.conf.CommonDescriptor; + import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; import java.time.Duration; @@ -49,7 +51,8 @@ public class ClientPoolProperty<V> { * the maximum number of clients that can be allocated for a node. When some clients are idle * for more than {@code maxIdleTimeForClient}, they will be cleaned up. */ - private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxClientNumForEachNode = + CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode(); /** * the minimum amount of time a client may sit idle in the pool before it is eligible for diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 81f2aa7156c..12a0dba8840 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -45,7 +45,6 @@ public enum ThreadName { MPP_COORDINATOR_EXECUTOR_POOL("MPP-Coordinator-Executor"), DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"), DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"), - MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"), ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"), // -------------------------- Compaction -------------------------- COMPACTION_WORKER("Compaction-Worker"), @@ -231,7 +230,6 @@ public enum ThreadName { MPP_COORDINATOR_EXECUTOR_POOL, DATANODE_INTERNAL_RPC_SERVICE, DATANODE_INTERNAL_RPC_PROCESSOR, - MPP_COORDINATOR_WRITE_EXECUTOR, ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL)); private static final Set<ThreadName> compactionThreadNames = new HashSet<>(Arrays.asList(COMPACTION_WORKER, COMPACTION_SUB_TASK, COMPACTION_SCHEDULE));
