This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch parallelDispatch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5121db04eb2c391c23cb10a201d6131c11c3da4c Author: shuwenwei <[email protected]> AuthorDate: Wed May 21 18:44:21 2025 +0800 topological parallel disptach read fragment instance --- .../fragment/FragmentInstanceManager.java | 10 ++ .../relational/LastQueryAggTableScanOperator.java | 3 + .../queryengine/plan/planner/TreeModelPlanner.java | 2 +- .../plan/planner/plan/PlanFragment.java | 9 ++ .../plan/planner/plan/node/PlanGraphPrinter.java | 1 + .../plan/relational/planner/TableModelPlanner.java | 2 +- .../distribute/TableModelQueryFragmentPlanner.java | 1 + .../plan/scheduler/ClusterScheduler.java | 11 +- .../scheduler/FragmentInstanceDispatcherImpl.java | 168 ++++++++++++++++++++- .../plan/scheduler/IFragInstanceDispatcher.java | 6 +- .../scheduler/load/LoadTsFileDispatcherImpl.java | 4 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 2 +- .../commons/concurrent/IoTDBThreadPoolFactory.java | 19 +++ .../iotdb/commons/concurrent/ThreadName.java | 1 + 14 files changed, 228 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index f1eed1d1f4e..97c99cab7f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -84,6 +84,8 @@ public class FragmentInstanceManager { private final ExecutorService intoOperationExecutor; private final ExecutorService modelInferenceExecutor; + private final ExecutorService dispatchExecutor; + private final MPPDataExchangeManager exchangeManager = MPPDataExchangeService.getInstance().getMPPDataExchangeManager(); @@ -104,6 +106,10 @@ public class FragmentInstanceManager { this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool( 4, ThreadName.FRAGMENT_INSTANCE_NOTIFICATION.getName()); + this.dispatchExecutor = + IoTDBThreadPoolFactory.newCachedThreadPool( + ThreadName.FRAGMENT_INSTANCE_DISPATCH.getName(), + Math.max(20, Runtime.getRuntime().availableProcessors() * 2)); this.infoCacheTime = new Duration(5, TimeUnit.MINUTES); @@ -426,6 +432,10 @@ public class FragmentInstanceManager { return modelInferenceExecutor; } + public ExecutorService getDispatchExecutor() { + return dispatchExecutor; + } + private static class InstanceHolder { private InstanceHolder() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java index 312294ff8b9..b078d7f2f34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggr import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; @@ -96,6 +97,8 @@ public class LastQueryAggTableScanOperator extends AbstractAggTableScanOperator this.hitCachedResults = hitCachedResults; this.dbName = qualifiedObjectName.getDatabaseName(); + this.operatorContext.recordSpecifiedInfo( + PlanGraphPrinter.CACHED_DEVICE_NUMBER, Integer.toString(cachedDeviceEntries.size())); for (int i = 0; i < parameter.tableAggregators.size(); i++) { if (parameter.tableAggregators.get(i).getAccumulator() instanceof LastAccumulator) { lastTimeAggregationIdx = i; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index eaaacad64ea..f2d901c1540 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -146,7 +146,7 @@ public class TreeModelPlanner implements IPlanner { new ClusterScheduler( context, stateMachine, - distributedPlan.getInstances(), + distributedPlan, context.getQueryType(), scheduledExecutor, syncInternalServiceClientManager, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java index 44f8f23fda8..448e8e2fda0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java @@ -58,6 +58,7 @@ public class PlanFragment { // indicate whether this PlanFragment is the root of the whole Fragment-Plan-Tree or not private boolean isRoot; + private int indexInFragmentInstanceList; public PlanFragment(PlanFragmentId id, PlanNode planNodeTree) { this.id = id; @@ -65,6 +66,14 @@ public class PlanFragment { this.isRoot = false; } + public int getIndexInFragmentInstanceList() { + return indexInFragmentInstanceList; + } + + public void setIndexInFragmentInstanceList(int indexInFragmentInstanceList) { + this.indexInFragmentInstanceList = indexInFragmentInstanceList; + } + public PlanFragmentId getId() { return id; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index c26327b59b6..94c00f88582 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -116,6 +116,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter private static final String REGION_NOT_ASSIGNED = "Not Assigned"; public static final String DEVICE_NUMBER = "DeviceNumber"; + public static final String CACHED_DEVICE_NUMBER = "CachedDeviceNumber"; public static final String CURRENT_USED_MEMORY = "CurrentUsedMemory"; public static final String MAX_USED_MEMORY = "MaxUsedMemory"; public static final String MAX_RESERVED_MEMORY = "MaxReservedMemory"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java index a5765ee538c..8d7ce3cebbb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java @@ -188,7 +188,7 @@ public class TableModelPlanner implements IPlanner { new ClusterScheduler( context, stateMachine, - distributedPlan.getInstances(), + distributedPlan, context.getQueryType(), scheduledExecutor, syncInternalServiceClientManager, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index 560e8a844a2..8f7c2aed732 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -155,6 +155,7 @@ public class TableModelQueryFragmentPlanner extends AbstractFragmentParallelPlan fragmentInstance.getFragment().generateTableModelTypeProvider(queryContext.getTypeProvider()); } instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); + fragment.setIndexInFragmentInstanceList(fragmentInstanceList.size()); fragmentInstanceList.add(fragmentInstance); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java index b74dba62c15..18209bef70f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java @@ -28,7 +28,9 @@ import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.rpc.TSStatusCode; import io.airlift.units.Duration; @@ -55,6 +57,7 @@ public class ClusterScheduler implements IScheduler { // The stateMachine of the QueryExecution owned by this QueryScheduler private final QueryStateMachine stateMachine; private final QueryType queryType; + private final SubPlan rootSubPlan; // The fragment instances which should be sent to corresponding Nodes. private final List<FragmentInstance> instances; @@ -68,14 +71,15 @@ public class ClusterScheduler implements IScheduler { public ClusterScheduler( MPPQueryContext queryContext, QueryStateMachine stateMachine, - List<FragmentInstance> instances, + DistributedQueryPlan distributedQueryPlan, QueryType queryType, ScheduledExecutorService scheduledExecutor, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager, IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncInternalServiceClientManager) { this.stateMachine = stateMachine; - this.instances = instances; + this.rootSubPlan = distributedQueryPlan.getRootSubPlan(); + this.instances = distributedQueryPlan.getInstances(); this.queryType = queryType; this.dispatcher = new FragmentInstanceDispatcherImpl( @@ -109,7 +113,8 @@ public class ClusterScheduler implements IScheduler { public void start() { stateMachine.transitionToDispatching(); long startTime = System.nanoTime(); - Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances); + Future<FragInstanceDispatchResult> dispatchResultFuture = + dispatcher.dispatch(rootSubPlan, instances); // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect. // So we need to start the state fetcher after the dispatching stage. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 111a40470ad..293d5c519c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -38,9 +38,11 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult; import org.apache.iotdb.db.queryengine.execution.executor.RegionReadExecutor; import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableSchemaQuerySuccessfulCallbackVisitor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableSchemaQueryWriteVisitor; @@ -67,6 +69,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -118,14 +121,175 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } @Override - public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) { + public Future<FragInstanceDispatchResult> dispatch( + SubPlan root, List<FragmentInstance> instances) { if (type == QueryType.READ) { - return dispatchRead(instances); + return instances.size() == 1 || root == null + ? dispatchRead(instances) + : topologicalParallelDispatchRead(root, instances); } else { return dispatchWrite(instances); } } + private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead( + SubPlan root, List<FragmentInstance> instances) { + long startTime = System.nanoTime(); + LinkedBlockingQueue<SubPlan> queue = new LinkedBlockingQueue<>(instances.size()); + List<Future<FragInstanceDispatchResult>> futures = new ArrayList<>(instances.size()); + queue.add(root); + try { + while (futures.size() < instances.size()) { + SubPlan next = queue.take(); + FragmentInstance fragmentInstance = + instances.get(next.getPlanFragment().getIndexInFragmentInstanceList()); + futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue)); + } + for (Future<FragInstanceDispatchResult> future : futures) { + FragInstanceDispatchResult result = future.get(); + if (!result.isSuccessful()) { + return immediateFuture(result); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Interrupted when dispatching read async", e); + return immediateFuture( + new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage()))); + } catch (Throwable t) { + LOGGER.warn(DISPATCH_FAILED, t); + return immediateFuture( + new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()))); + } finally { + long queryDispatchReadTime = System.nanoTime() - startTime; + QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ, queryDispatchReadTime); + queryContext.recordDispatchCost(queryDispatchReadTime); + } + return immediateFuture(new FragInstanceDispatchResult(true)); + } + + private Future<FragInstanceDispatchResult> asyncDispatchOneInstance( + SubPlan plan, FragmentInstance instance, LinkedBlockingQueue<SubPlan> queue) { + return FragmentInstanceManager.getInstance() + .getDispatchExecutor() + .submit( + () -> { + try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { + dispatchOneInstance(instance); + queue.addAll(plan.getChildren()); + } catch (FragmentInstanceDispatchException e) { + return new FragInstanceDispatchResult(e.getFailureStatus()); + } catch (Throwable t) { + LOGGER.warn(DISPATCH_FAILED, t); + return new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + } finally { + // friendly for gc, clear the plan node tree, for some queries select all devices, + // it + // will + // release lots of memory + if (!queryContext.isExplainAnalyze()) { + // EXPLAIN ANALYZE will use these instances, so we can't clear them + instance.getFragment().clearUselessField(); + } else { + // TypeProvider is not used in EXPLAIN ANALYZE, so we can clear it + instance.getFragment().clearTypeProvider(); + } + } + return new FragInstanceDispatchResult(true); + }); + } + + // public Future<FragInstanceDispatchResult> parallelDispatchRead( + // SubPlan root, List<FragmentInstance> instances) { + // long startTime = System.nanoTime(); + // Queue<SubPlan> queue = new LinkedList<>(); + // queue.add(root); + // List<List<FragmentInstance>> dispatchOrder = new ArrayList<>(); + // calculateFragmentInstancesDispatchOrder(dispatchOrder, instances, root, 0); + // try { + // for (List<FragmentInstance> currentLevel : dispatchOrder) { + // List<Future<FragInstanceDispatchResult>> futures = new ArrayList<>(currentLevel.size()); + // for (FragmentInstance fragmentInstance : currentLevel) { + // futures.add(asyncDispatchOneInstance(fragmentInstance)); + // } + // for (Future<FragInstanceDispatchResult> future : futures) { + // try { + // FragInstanceDispatchResult result = future.get(); + // if (!result.isSuccessful()) { + // return immediateFuture(result); + // } + // } catch (Exception e) { + // return immediateFuture( + // new FragInstanceDispatchResult( + // RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()))); + // } + // } + // } + // return immediateFuture(new FragInstanceDispatchResult(true)); + // } finally { + // long queryDispatchReadTime = System.nanoTime() - startTime; + // QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ, queryDispatchReadTime); + // queryContext.recordDispatchCost(queryDispatchReadTime); + // } + // } + // + // private void calculateFragmentInstancesDispatchOrder( + // List<List<FragmentInstance>> result, + // List<FragmentInstance> fragmentInstances, + // SubPlan current, + // int level) { + // List<FragmentInstance> currentLevelFragmentInstances; + // if (level == result.size()) { + // currentLevelFragmentInstances = new ArrayList<>(); + // result.add(currentLevelFragmentInstances); + // } else { + // currentLevelFragmentInstances = result.get(level); + // } + // int indexInFragmentInstanceList = + // current.getPlanFragment().getIndexInFragmentInstanceList(); + // currentLevelFragmentInstances.add(fragmentInstances.get(indexInFragmentInstanceList)); + // for (SubPlan child : current.getChildren()) { + // calculateFragmentInstancesDispatchOrder(result, fragmentInstances, child, level + 1); + // } + // } + // + + // private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(FragmentInstance instance) + // { + // return readDispatchThreadPool.submit( + // () -> { + // try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { + // dispatchOneInstance(instance); + // } catch (FragmentInstanceDispatchException e) { + // return new FragInstanceDispatchResult(e.getFailureStatus()); + // } catch (Throwable t) { + // LOGGER.warn(DISPATCH_FAILED, t); + // new FragInstanceDispatchResult( + // RpcUtils.getStatus( + // TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + // } finally { + // // friendly for gc, clear the plan node tree, for some queries select all devices, + // it + // // will + // // release lots of memory + // if (!queryContext.isExplainAnalyze()) { + // // EXPLAIN ANALYZE will use these instances, so we can't clear them + // instance.getFragment().clearUselessField(); + // } else { + // // TypeProvider is not used in EXPLAIN ANALYZE, so we can clear it + // instance.getFragment().clearTypeProvider(); + // } + // } + // return new FragInstanceDispatchResult(true); + // }); + // } + // TODO: (xingtanzjr) currently we use a sequential dispatch policy for READ, which is // unsafe for current FragmentInstance scheduler framework. We need to implement the // topological dispatch according to dependency relations between FragmentInstances diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java index 97363e44097..5552063a5d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.scheduler; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import java.util.List; import java.util.concurrent.Future; @@ -28,10 +29,11 @@ public interface IFragInstanceDispatcher { /** * Dispatch all Fragment instances asynchronously * + * @param root the root SubPlan * @param instances Fragment instance list - * @return Boolean. + * @return Future<FragInstanceDispatchResult> */ - Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances); + Future<FragInstanceDispatchResult> dispatch(SubPlan root, List<FragmentInstance> instances); void abort(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java index 5227f230007..de1e6287bc5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode; @@ -99,7 +100,8 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { } @Override - public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) { + public Future<FragInstanceDispatchResult> dispatch( + SubPlan root, List<FragmentInstance> instances) { return executor.submit( () -> { for (FragmentInstance instance : instances) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index ead789b1bcd..a157c5e2b59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -351,7 +351,7 @@ public class LoadTsFileScheduler implements IScheduler { queryContext.getSession()); instance.setExecutorAndHost(new StorageExecutor(replicaSet)); Future<FragInstanceDispatchResult> dispatchResultFuture = - dispatcher.dispatch(Collections.singletonList(instance)); + dispatcher.dispatch(null, Collections.singletonList(instance)); try { FragInstanceDispatchResult result = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java index 1cf5488d917..13442a8eb56 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java @@ -160,6 +160,25 @@ public class IoTDBThreadPoolFactory { poolName); } + /** + * see {@link Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}. + * + * @param poolName the name of thread pool. + * @param corePoolSize the corePoolSize of thread pool + * @return thread pool. + */ + public static ExecutorService newCachedThreadPool(String poolName, int corePoolSize) { + logger.info(NEW_CACHED_THREAD_POOL_LOGGER_FORMAT, poolName); + return new WrappedThreadPoolExecutor( + corePoolSize, + Integer.MAX_VALUE, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new IoTThreadFactory(poolName), + poolName); + } + /** * see {@link Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}. * diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index e208f0eb87a..e1e840148f3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -33,6 +33,7 @@ public enum ThreadName { TIMED_QUERY_SQL_COUNT("Timed-Query-SQL-Count"), FRAGMENT_INSTANCE_MANAGEMENT("Fragment-Instance-Management"), FRAGMENT_INSTANCE_NOTIFICATION("Fragment-Instance-Notification"), + FRAGMENT_INSTANCE_DISPATCH("Fragment-Instance-Dispatch"), DRIVER_TASK_SCHEDULER_NOTIFICATION("Driver-Task-Scheduler-Notification"), // -------------------------- MPP -------------------------- MPP_COORDINATOR_SCHEDULED_EXECUTOR("MPP-Coordinator-Scheduled-Executor"),
