This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d62483d07ae39ff03186577967c3a75eeff66c94
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sun Jun 19 23:38:18 2022 +0800

    spotless
---
 .../scheduler/FragmentInstanceDispatcherImpl.java   | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f994366f3f..35c301e230 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -58,6 +58,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
 public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher 
{
 
   private static final Logger logger =
@@ -88,7 +90,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     if (type == QueryType.READ) {
       return dispatchRead(instances);
     } else {
-      return dispatchWrite(instances);
+      return dispatchWriteSync(instances);
     }
   }
 
@@ -134,6 +136,23 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     return resultFuture;
   }
 
+  private Future<FragInstanceDispatchResult> 
dispatchWriteSync(List<FragmentInstance> instances) {
+    boolean result = true;
+    try {
+      for (FragmentInstance instance : instances) {
+
+        if (!dispatchOneInstance(instance)) {
+          result = false;
+          break;
+        }
+      }
+      return immediateFuture(new FragInstanceDispatchResult(result));
+    } catch (FragmentInstanceDispatchException e) {
+      logger.error("cannot dispatch FI for write operation", e);
+      return immediateFuture(new FragInstanceDispatchResult(false));
+    }
+  }
+
   private boolean dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
     try (SetThreadName fragmentInstanceName = new 
SetThreadName(instance.getId().getFullId())) {

Reply via email to