This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/write_opt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d62483d07ae39ff03186577967c3a75eeff66c94 Author: Jinrui.Zhang <[email protected]> AuthorDate: Sun Jun 19 23:38:18 2022 +0800 spotless --- .../scheduler/FragmentInstanceDispatcherImpl.java | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index f994366f3f..35c301e230 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -58,6 +58,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import static com.google.common.util.concurrent.Futures.immediateFuture; + public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { private static final Logger logger = @@ -88,7 +90,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { if (type == QueryType.READ) { return dispatchRead(instances); } else { - return dispatchWrite(instances); + return dispatchWriteSync(instances); } } @@ -134,6 +136,23 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { return resultFuture; } + private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) { + boolean result = true; + try { + for (FragmentInstance instance : instances) { + + if (!dispatchOneInstance(instance)) { + result = false; + break; + } + } + return immediateFuture(new FragInstanceDispatchResult(result)); + } catch (FragmentInstanceDispatchException e) { + logger.error("cannot dispatch FI for write operation", e); + return immediateFuture(new FragInstanceDispatchResult(false)); + } + } + private boolean dispatchOneInstance(FragmentInstance instance) throws FragmentInstanceDispatchException { try (SetThreadName fragmentInstanceName = new SetThreadName(instance.getId().getFullId())) {
