This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch parallelDispatch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5121db04eb2c391c23cb10a201d6131c11c3da4c
Author: shuwenwei <[email protected]>
AuthorDate: Wed May 21 18:44:21 2025 +0800

    topological parallel disptach read fragment instance
---
 .../fragment/FragmentInstanceManager.java          |  10 ++
 .../relational/LastQueryAggTableScanOperator.java  |   3 +
 .../queryengine/plan/planner/TreeModelPlanner.java |   2 +-
 .../plan/planner/plan/PlanFragment.java            |   9 ++
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   1 +
 .../plan/relational/planner/TableModelPlanner.java |   2 +-
 .../distribute/TableModelQueryFragmentPlanner.java |   1 +
 .../plan/scheduler/ClusterScheduler.java           |  11 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 168 ++++++++++++++++++++-
 .../plan/scheduler/IFragInstanceDispatcher.java    |   6 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   4 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   2 +-
 .../commons/concurrent/IoTDBThreadPoolFactory.java |  19 +++
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 14 files changed, 228 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index f1eed1d1f4e..97c99cab7f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -84,6 +84,8 @@ public class FragmentInstanceManager {
   private final ExecutorService intoOperationExecutor;
   private final ExecutorService modelInferenceExecutor;
 
+  private final ExecutorService dispatchExecutor;
+
   private final MPPDataExchangeManager exchangeManager =
       MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
 
@@ -104,6 +106,10 @@ public class FragmentInstanceManager {
     this.instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(
             4, ThreadName.FRAGMENT_INSTANCE_NOTIFICATION.getName());
+    this.dispatchExecutor =
+        IoTDBThreadPoolFactory.newCachedThreadPool(
+            ThreadName.FRAGMENT_INSTANCE_DISPATCH.getName(),
+            Math.max(20, Runtime.getRuntime().availableProcessors() * 2));
 
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
@@ -426,6 +432,10 @@ public class FragmentInstanceManager {
     return modelInferenceExecutor;
   }
 
+  public ExecutorService getDispatchExecutor() {
+    return dispatchExecutor;
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
index 312294ff8b9..b078d7f2f34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggr
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
@@ -96,6 +97,8 @@ public class LastQueryAggTableScanOperator extends 
AbstractAggTableScanOperator
     this.hitCachedResults = hitCachedResults;
     this.dbName = qualifiedObjectName.getDatabaseName();
 
+    this.operatorContext.recordSpecifiedInfo(
+        PlanGraphPrinter.CACHED_DEVICE_NUMBER, 
Integer.toString(cachedDeviceEntries.size()));
     for (int i = 0; i < parameter.tableAggregators.size(); i++) {
       if (parameter.tableAggregators.get(i).getAccumulator() instanceof 
LastAccumulator) {
         lastTimeAggregationIdx = i;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index eaaacad64ea..f2d901c1540 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -146,7 +146,7 @@ public class TreeModelPlanner implements IPlanner {
           new ClusterScheduler(
               context,
               stateMachine,
-              distributedPlan.getInstances(),
+              distributedPlan,
               context.getQueryType(),
               scheduledExecutor,
               syncInternalServiceClientManager,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 44f8f23fda8..448e8e2fda0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -58,6 +58,7 @@ public class PlanFragment {
 
   // indicate whether this PlanFragment is the root of the whole 
Fragment-Plan-Tree or not
   private boolean isRoot;
+  private int indexInFragmentInstanceList;
 
   public PlanFragment(PlanFragmentId id, PlanNode planNodeTree) {
     this.id = id;
@@ -65,6 +66,14 @@ public class PlanFragment {
     this.isRoot = false;
   }
 
+  public int getIndexInFragmentInstanceList() {
+    return indexInFragmentInstanceList;
+  }
+
+  public void setIndexInFragmentInstanceList(int indexInFragmentInstanceList) {
+    this.indexInFragmentInstanceList = indexInFragmentInstanceList;
+  }
+
   public PlanFragmentId getId() {
     return id;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index c26327b59b6..94c00f88582 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -116,6 +116,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
 
   private static final String REGION_NOT_ASSIGNED = "Not Assigned";
   public static final String DEVICE_NUMBER = "DeviceNumber";
+  public static final String CACHED_DEVICE_NUMBER = "CachedDeviceNumber";
   public static final String CURRENT_USED_MEMORY = "CurrentUsedMemory";
   public static final String MAX_USED_MEMORY = "MaxUsedMemory";
   public static final String MAX_RESERVED_MEMORY = "MaxReservedMemory";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index a5765ee538c..8d7ce3cebbb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -188,7 +188,7 @@ public class TableModelPlanner implements IPlanner {
           new ClusterScheduler(
               context,
               stateMachine,
-              distributedPlan.getInstances(),
+              distributedPlan,
               context.getQueryType(),
               scheduledExecutor,
               syncInternalServiceClientManager,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index 560e8a844a2..8f7c2aed732 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -155,6 +155,7 @@ public class TableModelQueryFragmentPlanner extends 
AbstractFragmentParallelPlan
       
fragmentInstance.getFragment().generateTableModelTypeProvider(queryContext.getTypeProvider());
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
+    fragment.setIndexInFragmentInstanceList(fragmentInstanceList.size());
     fragmentInstanceList.add(fragmentInstance);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index b74dba62c15..18209bef70f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -28,7 +28,9 @@ import 
org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.units.Duration;
@@ -55,6 +57,7 @@ public class ClusterScheduler implements IScheduler {
   // The stateMachine of the QueryExecution owned by this QueryScheduler
   private final QueryStateMachine stateMachine;
   private final QueryType queryType;
+  private final SubPlan rootSubPlan;
   // The fragment instances which should be sent to corresponding Nodes.
   private final List<FragmentInstance> instances;
 
@@ -68,14 +71,15 @@ public class ClusterScheduler implements IScheduler {
   public ClusterScheduler(
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
-      List<FragmentInstance> instances,
+      DistributedQueryPlan distributedQueryPlan,
       QueryType queryType,
       ScheduledExecutorService scheduledExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncInternalServiceClientManager,
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
           asyncInternalServiceClientManager) {
     this.stateMachine = stateMachine;
-    this.instances = instances;
+    this.rootSubPlan = distributedQueryPlan.getRootSubPlan();
+    this.instances = distributedQueryPlan.getInstances();
     this.queryType = queryType;
     this.dispatcher =
         new FragmentInstanceDispatcherImpl(
@@ -109,7 +113,8 @@ public class ClusterScheduler implements IScheduler {
   public void start() {
     stateMachine.transitionToDispatching();
     long startTime = System.nanoTime();
-    Future<FragInstanceDispatchResult> dispatchResultFuture = 
dispatcher.dispatch(instances);
+    Future<FragInstanceDispatchResult> dispatchResultFuture =
+        dispatcher.dispatch(rootSubPlan, instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to 
consensus redirect.
     // So we need to start the state fetcher after the dispatching stage.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 111a40470ad..293d5c519c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -38,9 +38,11 @@ import 
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
 import org.apache.iotdb.db.queryengine.execution.executor.RegionReadExecutor;
 import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableSchemaQuerySuccessfulCallbackVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableSchemaQueryWriteVisitor;
@@ -67,6 +69,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -118,14 +121,175 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   @Override
-  public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances) {
+  public Future<FragInstanceDispatchResult> dispatch(
+      SubPlan root, List<FragmentInstance> instances) {
     if (type == QueryType.READ) {
-      return dispatchRead(instances);
+      return instances.size() == 1 || root == null
+          ? dispatchRead(instances)
+          : topologicalParallelDispatchRead(root, instances);
     } else {
       return dispatchWrite(instances);
     }
   }
 
+  private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+      SubPlan root, List<FragmentInstance> instances) {
+    long startTime = System.nanoTime();
+    LinkedBlockingQueue<SubPlan> queue = new 
LinkedBlockingQueue<>(instances.size());
+    List<Future<FragInstanceDispatchResult>> futures = new 
ArrayList<>(instances.size());
+    queue.add(root);
+    try {
+      while (futures.size() < instances.size()) {
+        SubPlan next = queue.take();
+        FragmentInstance fragmentInstance =
+            
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+        futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+      }
+      for (Future<FragInstanceDispatchResult> future : futures) {
+        FragInstanceDispatchResult result = future.get();
+        if (!result.isSuccessful()) {
+          return immediateFuture(result);
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.error("Interrupted when dispatching read async", e);
+      return immediateFuture(
+          new FragInstanceDispatchResult(
+              RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + 
e.getMessage())));
+    } catch (Throwable t) {
+      LOGGER.warn(DISPATCH_FAILED, t);
+      return immediateFuture(
+          new FragInstanceDispatchResult(
+              RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage())));
+    } finally {
+      long queryDispatchReadTime = System.nanoTime() - startTime;
+      QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ, 
queryDispatchReadTime);
+      queryContext.recordDispatchCost(queryDispatchReadTime);
+    }
+    return immediateFuture(new FragInstanceDispatchResult(true));
+  }
+
+  private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
+      SubPlan plan, FragmentInstance instance, LinkedBlockingQueue<SubPlan> 
queue) {
+    return FragmentInstanceManager.getInstance()
+        .getDispatchExecutor()
+        .submit(
+            () -> {
+              try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+                dispatchOneInstance(instance);
+                queue.addAll(plan.getChildren());
+              } catch (FragmentInstanceDispatchException e) {
+                return new FragInstanceDispatchResult(e.getFailureStatus());
+              } catch (Throwable t) {
+                LOGGER.warn(DISPATCH_FAILED, t);
+                return new FragInstanceDispatchResult(
+                    RpcUtils.getStatus(
+                        TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS 
+ t.getMessage()));
+              } finally {
+                // friendly for gc, clear the plan node tree, for some queries 
select all devices,
+                // it
+                // will
+                // release lots of memory
+                if (!queryContext.isExplainAnalyze()) {
+                  // EXPLAIN ANALYZE will use these instances, so we can't 
clear them
+                  instance.getFragment().clearUselessField();
+                } else {
+                  // TypeProvider is not used in EXPLAIN ANALYZE, so we can 
clear it
+                  instance.getFragment().clearTypeProvider();
+                }
+              }
+              return new FragInstanceDispatchResult(true);
+            });
+  }
+
+  //  public Future<FragInstanceDispatchResult> parallelDispatchRead(
+  //      SubPlan root, List<FragmentInstance> instances) {
+  //    long startTime = System.nanoTime();
+  //    Queue<SubPlan> queue = new LinkedList<>();
+  //    queue.add(root);
+  //    List<List<FragmentInstance>> dispatchOrder = new ArrayList<>();
+  //    calculateFragmentInstancesDispatchOrder(dispatchOrder, instances, 
root, 0);
+  //    try {
+  //      for (List<FragmentInstance> currentLevel : dispatchOrder) {
+  //        List<Future<FragInstanceDispatchResult>> futures = new 
ArrayList<>(currentLevel.size());
+  //        for (FragmentInstance fragmentInstance : currentLevel) {
+  //          futures.add(asyncDispatchOneInstance(fragmentInstance));
+  //        }
+  //        for (Future<FragInstanceDispatchResult> future : futures) {
+  //          try {
+  //            FragInstanceDispatchResult result = future.get();
+  //            if (!result.isSuccessful()) {
+  //              return immediateFuture(result);
+  //            }
+  //          } catch (Exception e) {
+  //            return immediateFuture(
+  //                new FragInstanceDispatchResult(
+  //                    RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage())));
+  //          }
+  //        }
+  //      }
+  //      return immediateFuture(new FragInstanceDispatchResult(true));
+  //    } finally {
+  //      long queryDispatchReadTime = System.nanoTime() - startTime;
+  //      QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ, 
queryDispatchReadTime);
+  //      queryContext.recordDispatchCost(queryDispatchReadTime);
+  //    }
+  //  }
+  //
+  //  private void calculateFragmentInstancesDispatchOrder(
+  //      List<List<FragmentInstance>> result,
+  //      List<FragmentInstance> fragmentInstances,
+  //      SubPlan current,
+  //      int level) {
+  //    List<FragmentInstance> currentLevelFragmentInstances;
+  //    if (level == result.size()) {
+  //      currentLevelFragmentInstances = new ArrayList<>();
+  //      result.add(currentLevelFragmentInstances);
+  //    } else {
+  //      currentLevelFragmentInstances = result.get(level);
+  //    }
+  //    int indexInFragmentInstanceList =
+  // current.getPlanFragment().getIndexInFragmentInstanceList();
+  //    
currentLevelFragmentInstances.add(fragmentInstances.get(indexInFragmentInstanceList));
+  //    for (SubPlan child : current.getChildren()) {
+  //      calculateFragmentInstancesDispatchOrder(result, fragmentInstances, 
child, level + 1);
+  //    }
+  //  }
+  //
+
+  //  private Future<FragInstanceDispatchResult> 
asyncDispatchOneInstance(FragmentInstance instance)
+  // {
+  //    return readDispatchThreadPool.submit(
+  //        () -> {
+  //          try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+  //            dispatchOneInstance(instance);
+  //          } catch (FragmentInstanceDispatchException e) {
+  //            return new FragInstanceDispatchResult(e.getFailureStatus());
+  //          } catch (Throwable t) {
+  //            LOGGER.warn(DISPATCH_FAILED, t);
+  //            new FragInstanceDispatchResult(
+  //                RpcUtils.getStatus(
+  //                    TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS 
+ t.getMessage()));
+  //          } finally {
+  //            // friendly for gc, clear the plan node tree, for some queries 
select all devices,
+  // it
+  //            // will
+  //            // release lots of memory
+  //            if (!queryContext.isExplainAnalyze()) {
+  //              // EXPLAIN ANALYZE will use these instances, so we can't 
clear them
+  //              instance.getFragment().clearUselessField();
+  //            } else {
+  //              // TypeProvider is not used in EXPLAIN ANALYZE, so we can 
clear it
+  //              instance.getFragment().clearTypeProvider();
+  //            }
+  //          }
+  //          return new FragInstanceDispatchResult(true);
+  //        });
+  //  }
+
   // TODO: (xingtanzjr) currently we use a sequential dispatch policy for 
READ, which is
   //  unsafe for current FragmentInstance scheduler framework. We need to 
implement the
   //  topological dispatch according to dependency relations between 
FragmentInstances
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
index 97363e44097..5552063a5d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.scheduler;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 
 import java.util.List;
 import java.util.concurrent.Future;
@@ -28,10 +29,11 @@ public interface IFragInstanceDispatcher {
   /**
    * Dispatch all Fragment instances asynchronously
    *
+   * @param root the root SubPlan
    * @param instances Fragment instance list
-   * @return Boolean.
+   * @return Future<FragInstanceDispatchResult>
    */
-  Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances);
+  Future<FragInstanceDispatchResult> dispatch(SubPlan root, 
List<FragmentInstance> instances);
 
   void abort();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 5227f230007..de1e6287bc5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
@@ -99,7 +100,8 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   @Override
-  public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances) {
+  public Future<FragInstanceDispatchResult> dispatch(
+      SubPlan root, List<FragmentInstance> instances) {
     return executor.submit(
         () -> {
           for (FragmentInstance instance : instances) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index ead789b1bcd..a157c5e2b59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -351,7 +351,7 @@ public class LoadTsFileScheduler implements IScheduler {
             queryContext.getSession());
     instance.setExecutorAndHost(new StorageExecutor(replicaSet));
     Future<FragInstanceDispatchResult> dispatchResultFuture =
-        dispatcher.dispatch(Collections.singletonList(instance));
+        dispatcher.dispatch(null, Collections.singletonList(instance));
 
     try {
       FragInstanceDispatchResult result =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 1cf5488d917..13442a8eb56 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -160,6 +160,25 @@ public class IoTDBThreadPoolFactory {
         poolName);
   }
 
+  /**
+   * see {@link 
Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}.
+   *
+   * @param poolName the name of thread pool.
+   * @param corePoolSize the corePoolSize of thread pool
+   * @return thread pool.
+   */
+  public static ExecutorService newCachedThreadPool(String poolName, int 
corePoolSize) {
+    logger.info(NEW_CACHED_THREAD_POOL_LOGGER_FORMAT, poolName);
+    return new WrappedThreadPoolExecutor(
+        corePoolSize,
+        Integer.MAX_VALUE,
+        60L,
+        TimeUnit.SECONDS,
+        new SynchronousQueue<>(),
+        new IoTThreadFactory(poolName),
+        poolName);
+  }
+
   /**
    * see {@link 
Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}.
    *
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e208f0eb87a..e1e840148f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -33,6 +33,7 @@ public enum ThreadName {
   TIMED_QUERY_SQL_COUNT("Timed-Query-SQL-Count"),
   FRAGMENT_INSTANCE_MANAGEMENT("Fragment-Instance-Management"),
   FRAGMENT_INSTANCE_NOTIFICATION("Fragment-Instance-Notification"),
+  FRAGMENT_INSTANCE_DISPATCH("Fragment-Instance-Dispatch"),
   DRIVER_TASK_SCHEDULER_NOTIFICATION("Driver-Task-Scheduler-Notification"),
   // -------------------------- MPP --------------------------
   MPP_COORDINATOR_SCHEDULED_EXECUTOR("MPP-Coordinator-Scheduled-Executor"),

Reply via email to