This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch parallelDispatch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f51faac6deac6760c53845203f751c56a56626e6
Author: shuwenwei <[email protected]>
AuthorDate: Thu May 22 10:12:47 2025 +0800

    break loop when the previous dispatch failed
---
 .../scheduler/FragmentInstanceDispatcherImpl.java    | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 293d5c519c1..f773da76d0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -135,12 +136,17 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
       SubPlan root, List<FragmentInstance> instances) {
     long startTime = System.nanoTime();
-    LinkedBlockingQueue<SubPlan> queue = new 
LinkedBlockingQueue<>(instances.size());
+    LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new 
LinkedBlockingQueue<>(instances.size());
     List<Future<FragInstanceDispatchResult>> futures = new 
ArrayList<>(instances.size());
-    queue.add(root);
+    queue.add(new Pair<>(root, true));
     try {
       while (futures.size() < instances.size()) {
-        SubPlan next = queue.take();
+        Pair<SubPlan, Boolean> pair = queue.take();
+        SubPlan next = pair.getLeft();
+        boolean previousSuccess = pair.getRight();
+        if (!previousSuccess) {
+          break;
+        }
         FragmentInstance fragmentInstance =
             
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
         futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
@@ -173,14 +179,15 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
-      SubPlan plan, FragmentInstance instance, LinkedBlockingQueue<SubPlan> 
queue) {
+      SubPlan plan, FragmentInstance instance, 
LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) {
     return FragmentInstanceManager.getInstance()
         .getDispatchExecutor()
         .submit(
             () -> {
+              boolean success = false;
               try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
                 dispatchOneInstance(instance);
-                queue.addAll(plan.getChildren());
+                success = true;
               } catch (FragmentInstanceDispatchException e) {
                 return new FragInstanceDispatchResult(e.getFailureStatus());
               } catch (Throwable t) {
@@ -189,6 +196,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                     RpcUtils.getStatus(
                         TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS 
+ t.getMessage()));
               } finally {
+                for (SubPlan child : plan.getChildren()) {
+                  queue.add(new Pair<>(child, success));
+                }
                 // friendly for gc, clear the plan node tree, for some queries 
select all devices,
                 // it
                 // will

Reply via email to