This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch parallelDispatch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f51faac6deac6760c53845203f751c56a56626e6 Author: shuwenwei <[email protected]> AuthorDate: Thu May 22 10:12:47 2025 +0800 break loop when the previous dispatch failed --- .../scheduler/FragmentInstanceDispatcherImpl.java | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 293d5c519c1..f773da76d0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.thrift.TException; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,12 +136,17 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead( SubPlan root, List<FragmentInstance> instances) { long startTime = System.nanoTime(); - LinkedBlockingQueue<SubPlan> queue = new LinkedBlockingQueue<>(instances.size()); + LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new LinkedBlockingQueue<>(instances.size()); List<Future<FragInstanceDispatchResult>> futures = new ArrayList<>(instances.size()); - queue.add(root); + queue.add(new Pair<>(root, true)); try { while (futures.size() < instances.size()) { - SubPlan next = queue.take(); + Pair<SubPlan, Boolean> pair = queue.take(); + SubPlan next = pair.getLeft(); + boolean previousSuccess = pair.getRight(); + if (!previousSuccess) { + break; + } FragmentInstance fragmentInstance = instances.get(next.getPlanFragment().getIndexInFragmentInstanceList()); futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue)); @@ -173,14 +179,15 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } private Future<FragInstanceDispatchResult> asyncDispatchOneInstance( - SubPlan plan, FragmentInstance instance, LinkedBlockingQueue<SubPlan> queue) { + SubPlan plan, FragmentInstance instance, LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) { return FragmentInstanceManager.getInstance() .getDispatchExecutor() .submit( () -> { + boolean success = false; try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { dispatchOneInstance(instance); - queue.addAll(plan.getChildren()); + success = true; } catch (FragmentInstanceDispatchException e) { return new FragInstanceDispatchResult(e.getFailureStatus()); } catch (Throwable t) { @@ -189,6 +196,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { RpcUtils.getStatus( TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); } finally { + for (SubPlan child : plan.getChildren()) { + queue.add(new Pair<>(child, success)); + } // friendly for gc, clear the plan node tree, for some queries select all devices, // it // will
