This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 31ac1c3027 [IOTDB-5399] Collect complete dispatch result for return 
(#8963)
31ac1c3027 is described below

commit 31ac1c3027a87762ce38a69d29c17724bba1ffa3
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Feb 2 17:01:24 2023 +0800

    [IOTDB-5399] Collect complete dispatch result for return (#8963)
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 29 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cedfdbb684..f299d862e9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -49,6 +49,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -119,20 +120,38 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   private Future<FragInstanceDispatchResult> 
dispatchWriteSync(List<FragmentInstance> instances) {
+    List<TSStatus> failureStatusList = new ArrayList<>();
     for (FragmentInstance instance : instances) {
       try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
         dispatchOneInstance(instance);
       } catch (FragmentInstanceDispatchException e) {
-        return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
+        TSStatus failureStatus = e.getFailureStatus();
+        if (instances.size() == 1) {
+          failureStatusList.add(failureStatus);
+        } else {
+          if (failureStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+            failureStatusList.addAll(failureStatus.getSubStatus());
+          } else {
+            failureStatusList.add(failureStatus);
+          }
+        }
       } catch (Throwable t) {
         logger.warn("[DispatchFailed]", t);
+        failureStatusList.add(
+            RpcUtils.getStatus(
+                TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + 
t.getMessage()));
+      }
+    }
+    if (failureStatusList.isEmpty()) {
+      return immediateFuture(new FragInstanceDispatchResult(true));
+    } else {
+      if (instances.size() == 1) {
+        return immediateFuture(new 
FragInstanceDispatchResult(failureStatusList.get(0)));
+      } else {
         return immediateFuture(
-            new FragInstanceDispatchResult(
-                RpcUtils.getStatus(
-                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " 
+ t.getMessage())));
+            new 
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
       }
     }
-    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
   private void dispatchOneInstance(FragmentInstance instance)

Reply via email to