This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryException
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 50ed598efc04e8b6aa05d82cac36fe57be6db919
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jul 12 13:53:27 2022 +0800

    make query error more concrete
---
 .../db/mpp/execution/exchange/ISourceHandle.java   |  12 +-
 .../mpp/execution/exchange/LocalSourceHandle.java  |  45 +++++-
 .../execution/exchange/MPPDataExchangeManager.java |  12 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  25 +++-
 .../db/mpp/execution/exchange/SourceHandle.java    |  48 +++++--
 .../fragment/FragmentInstanceContext.java          |   7 +
 .../fragment/FragmentInstanceExecution.java        |  12 +-
 .../fragment/FragmentInstanceManager.java          |   3 +-
 .../operator/source/ExchangeOperator.java          |   2 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |   8 +-
 .../db/mpp/plan/execution/IQueryExecution.java     |   3 +-
 .../db/mpp/plan/execution/QueryExecution.java      |  32 ++++-
 .../plan/execution/memory/MemorySourceHandle.java  |   3 +
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |   1 +
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 158 +++++++++++----------
 .../mpprest/handler/QueryDataSetHandler.java       |  21 +--
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  80 ++++++++---
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |   3 +-
 thrift/src/main/thrift/datanode.thrift             |   1 +
 20 files changed, 339 insertions(+), 139 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
index be62bff4ff..f2538c8db7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
@@ -50,8 +50,18 @@ public interface ISourceHandle {
   boolean isAborted();
 
   /**
-   * Abort the handle. Discard all tsblocks which may still be in the memory 
buffer and complete the
+   * Abort the handle. Discard all tsblocks which may still be in the memory 
buffer and cancel the
    * future returned by {@link #isBlocked()}.
+   *
+   * <p>Should only be called in abnormal case
    */
   void abort();
+
+  /**
+   * Close the handle. Discard all tsblocks which may still be in the memory 
buffer and complete the
+   * future returned by {@link #isBlocked()}.
+   *
+   * <p>Should only be called in normal case
+   */
+  void close();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index c101e7b644..95056cf44b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -43,6 +43,8 @@ public class LocalSourceHandle implements ISourceHandle {
   private final SharedTsBlockQueue queue;
   private boolean aborted = false;
 
+  private boolean closed = false;
+
   private int currSequenceId;
 
   private final String threadName;
@@ -81,9 +83,8 @@ public class LocalSourceHandle implements ISourceHandle {
   @Override
   public TsBlock receive() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      if (aborted) {
-        throw new IllegalStateException("Source handle is aborted.");
-      }
+      checkState();
+
       if (!queue.isBlocked().isDone()) {
         throw new IllegalStateException("Source handle is blocked.");
       }
@@ -123,9 +124,7 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    if (aborted) {
-      throw new IllegalStateException("Source handle is closed.");
-    }
+    checkState();
     return nonCancellationPropagating(queue.isBlocked());
   }
 
@@ -136,6 +135,9 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public void abort() {
+    if (aborted || closed) {
+      return;
+    }
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       logger.info("Source handle is being aborted.");
       synchronized (queue) {
@@ -143,7 +145,7 @@ public class LocalSourceHandle implements ISourceHandle {
           if (aborted) {
             return;
           }
-          queue.destroy();
+          queue.abort();
           aborted = true;
           sourceHandleListener.onAborted(this);
         }
@@ -152,6 +154,35 @@ public class LocalSourceHandle implements ISourceHandle {
     }
   }
 
+  @Override
+  public void close() {
+    if (aborted || closed) {
+      return;
+    }
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      logger.info("Source handle is being closed.");
+      synchronized (queue) {
+        synchronized (this) {
+          if (aborted) {
+            return;
+          }
+          queue.destroy();
+          closed = true;
+          sourceHandleListener.onFinished(this);
+        }
+      }
+      logger.info("Source handle is closed");
+    }
+  }
+
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("Source Handle is closed.");
+    }
+  }
+
   public TFragmentInstanceId getRemoteFragmentInstanceId() {
     return remoteFragmentInstanceId;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 8aa2a6fa3a..7f087e2a9a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -141,7 +141,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             || sourceHandles
                 .get(e.getTargetFragmentInstanceId())
                 .get(e.getTargetPlanNodeId())
-                .isAborted()) {
+                .isAborted()
+            || sourceHandles
+                .get(e.getTargetFragmentInstanceId())
+                .get(e.getTargetPlanNodeId())
+                .isFinished()) {
           // In some scenario, when the SourceHandle sends the data block ACK 
event, its upstream
           // may
           // have already been stopped. For example, in the query whit 
LimitOperator, the downstream
@@ -176,7 +180,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             || sourceHandles
                 .get(e.getTargetFragmentInstanceId())
                 .get(e.getTargetPlanNodeId())
-                .isAborted()) {
+                .isAborted()
+            || sourceHandles
+                .get(e.getTargetFragmentInstanceId())
+                .get(e.getTargetPlanNodeId())
+                .isFinished()) {
           logger.warn(
               "received onEndOfDataBlockEvent but the downstream 
FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 7aa96230ba..7cac64a6e3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -152,7 +152,7 @@ public class SharedTsBlockQueue {
     return blockedOnMemory;
   }
 
-  /** Destroy the queue and cancel the future. */
+  /** Destroy the queue and complete the future. Should only be called in 
normal case */
   public void destroy() {
     if (destroyed) {
       return;
@@ -172,4 +172,27 @@ public class SharedTsBlockQueue {
       bufferRetainedSizeInBytes = 0;
     }
   }
+
+  // TODO add Throwable t as a parameter of this method, and then call 
blocked.setException(t);
+  // instead of blocked.cancel(true);
+  /** Destroy the queue and cancel the future. Should only be called in normal 
case */
+  public void abort() {
+    if (destroyed) {
+      return;
+    }
+    destroyed = true;
+    if (!blocked.isDone()) {
+      blocked.cancel(true);
+    }
+    if (blockedOnMemory != null) {
+      bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+    }
+    queue.clear();
+    if (bufferRetainedSizeInBytes > 0L) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), 
bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index facd0c1e0f..15ac9e0529 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -86,6 +86,8 @@ public class SourceHandle implements ISourceHandle {
   private int lastSequenceId = Integer.MAX_VALUE;
   private boolean aborted = false;
 
+  private boolean closed = false;
+
   public SourceHandle(
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -116,9 +118,8 @@ public class SourceHandle implements ISourceHandle {
   public synchronized TsBlock receive() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
 
-      if (aborted) {
-        throw new IllegalStateException("Source handle is aborted.");
-      }
+      checkState();
+
       if (!blocked.isDone()) {
         throw new IllegalStateException("Source handle is blocked.");
       }
@@ -149,7 +150,7 @@ public class SourceHandle implements ISourceHandle {
 
   private synchronized void trySubmitGetDataBlocksTask() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      if (aborted) {
+      if (aborted || closed) {
         return;
       }
       if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
@@ -202,9 +203,7 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized ListenableFuture<?> isBlocked() {
-    if (aborted) {
-      throw new IllegalStateException("Source handle is aborted.");
-    }
+    checkState();
     return nonCancellationPropagating(blocked);
   }
 
@@ -234,7 +233,7 @@ public class SourceHandle implements ISourceHandle {
   @Override
   public synchronized void abort() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      if (aborted) {
+      if (aborted || closed) {
         return;
       }
       if (blocked != null && !blocked.isDone()) {
@@ -255,6 +254,31 @@ public class SourceHandle implements ISourceHandle {
     }
   }
 
+  @Override
+  public synchronized void close() {
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      if (aborted || closed) {
+        return;
+      }
+      if (blocked != null && !blocked.isDone()) {
+        blocked.set(null);
+      }
+      if (blockedOnMemory != null) {
+        bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+      }
+      sequenceIdToDataBlockSize.clear();
+      if (bufferRetainedSizeInBytes > 0) {
+        localMemoryManager
+            .getQueryPool()
+            .free(localFragmentInstanceId.getQueryId(), 
bufferRetainedSizeInBytes);
+        bufferRetainedSizeInBytes = 0;
+      }
+      closed = true;
+      currSequenceId = lastSequenceId + 1;
+      sourceHandleListener.onFinished(this);
+    }
+  }
+
   @Override
   public boolean isFinished() {
     return remoteTsBlockedConsumedUp();
@@ -293,6 +317,14 @@ public class SourceHandle implements ISourceHandle {
     return aborted;
   }
 
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("SourceHandle is closed.");
+    }
+  }
+
   @Override
   public String toString() {
     return String.format(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index ffc9d101b3..0d5cdf54d1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -150,6 +151,12 @@ public class FragmentInstanceContext extends QueryContext {
     stateMachine.failed(cause);
   }
 
+  public String getFailedCause() {
+    return stateMachine.getFailureCauses().stream()
+        .map(Throwable::getMessage)
+        .collect(Collectors.joining("; "));
+  }
+
   public void finished() {
     stateMachine.finished();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index ab51fa842d..ac70a848dd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -84,12 +84,8 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(stateMachine.getState(), 
context.getEndTime());
-  }
-
-  public void failed(Throwable cause) {
-    requireNonNull(cause, "cause is null");
-    stateMachine.failed(cause);
+    return new FragmentInstanceInfo(
+        stateMachine.getState(), context.getEndTime(), 
context.getFailedCause());
   }
 
   public void cancel() {
@@ -121,7 +117,9 @@ public class FragmentInstanceExecution {
             sinkHandle.abort();
             // help for gc
             sinkHandle = null;
-            scheduler.abortFragmentInstance(instanceId);
+            if (newState.isFailed()) {
+              scheduler.abortFragmentInstance(instanceId);
+            }
           }
         });
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 504d3e4676..51b689eda2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -194,8 +194,9 @@ public class FragmentInstanceManager {
   }
 
   private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId 
instanceId) {
+    FragmentInstanceContext context = instanceContext.get(instanceId);
     return new FragmentInstanceInfo(
-        FragmentInstanceState.FAILED, 
instanceContext.get(instanceId).getEndTime());
+        FragmentInstanceState.FAILED, context.getEndTime(), 
context.getFailedCause());
   }
 
   private void removeOldInstances() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 3579a642bf..c72063caeb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -81,6 +81,6 @@ public class ExchangeOperator implements SourceOperator {
 
   @Override
   public void close() throws Exception {
-    sourceHandle.abort();
+    sourceHandle.close();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 19356f6471..2532cc4cc0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -119,7 +120,12 @@ public class ClusterSchemaFetcher implements 
ISchemaFetcher {
         while (coordinator.getQueryExecution(queryId).hasNextResult()) {
           // The query will be transited to FINISHED when invoking 
getBatchResult() at the last time
           // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock = 
coordinator.getQueryExecution(queryId).getBatchResult();
+          Optional<TsBlock> tsBlock;
+          try {
+            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+          } catch (IoTDBException e) {
+            throw new RuntimeException("Fetch Schema failed. ", e);
+          }
           if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
             break;
           }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 70fffda9b6..ba0380d015 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.execution;
 
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
@@ -34,7 +35,7 @@ public interface IQueryExecution {
 
   ExecutionResult getStatus();
 
-  Optional<TsBlock> getBatchResult();
+  Optional<TsBlock> getBatchResult() throws IoTDBException;
 
   boolean hasNextResult();
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4bae3eeafc..fe6d48afa2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -64,7 +65,6 @@ import io.airlift.concurrent.SetThreadName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -74,6 +74,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Throwables.throwIfUnchecked;
 import static 
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
@@ -280,11 +281,22 @@ public class QueryExecution implements IQueryExecution {
    * implemented with DataStreamManager)
    */
   @Override
-  public Optional<TsBlock> getBatchResult() {
+  public Optional<TsBlock> getBatchResult() throws IoTDBException {
+    checkArgument(resultHandle != null, "ResultHandle in Coordinator should be 
init firstly.");
     // iterate until we get a non-nullable TsBlock or result is finished
     while (true) {
       try {
-        if (resultHandle == null || resultHandle.isAborted() || 
resultHandle.isFinished()) {
+        if (resultHandle.isAborted()) {
+          logger.info("resultHandle for client is aborted");
+          stateMachine.transitionToAborted();
+          if (stateMachine.getFailureStatus() != null) {
+            throw new IoTDBException(
+                stateMachine.getFailureStatus().getMessage(), 
stateMachine.getFailureStatus().code);
+          } else {
+            throw new IoTDBException(
+                stateMachine.getFailureMessage(), 
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+          }
+        } else if (resultHandle.isFinished()) {
           // Once the resultHandle is finished, we should transit the state of 
this query to
           // FINISHED.
           // So that the corresponding cleanup work could be triggered.
@@ -292,6 +304,7 @@ public class QueryExecution implements IQueryExecution {
           stateMachine.transitionToFinished();
           return Optional.empty();
         }
+
         ListenableFuture<?> blocked = resultHandle.isBlocked();
         blocked.get();
         if (!resultHandle.isFinished()) {
@@ -305,13 +318,17 @@ public class QueryExecution implements IQueryExecution {
         }
       } catch (ExecutionException | CancellationException e) {
         stateMachine.transitionToFailed(e);
+        if (stateMachine.getFailureStatus() != null) {
+          throw new IoTDBException(
+              stateMachine.getFailureStatus().getMessage(), 
stateMachine.getFailureStatus().code);
+        }
         Throwable t = e.getCause() == null ? e : e.getCause();
         throwIfUnchecked(t);
-        throw new RuntimeException(t);
+        throw new IoTDBException(t, 
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
       } catch (InterruptedException e) {
         stateMachine.transitionToFailed(e);
         Thread.currentThread().interrupt();
-        throw new RuntimeException(new SQLException("ResultSet thread was 
interrupted", e));
+        throw new IoTDBException(e, 
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
       }
     }
   }
@@ -363,7 +380,10 @@ public class QueryExecution implements IQueryExecution {
       }
       return new ExecutionResult(
           context.getQueryId(),
-          RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
stateMachine.getFailureMessage()));
+          stateMachine.getFailureStatus() == null
+              ? RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, 
stateMachine.getFailureMessage())
+              : stateMachine.getFailureStatus());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index 588407eae1..2cf224962b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -77,4 +77,7 @@ public class MemorySourceHandle implements ISourceHandle {
 
   @Override
   public void abort() {}
+
+  @Override
+  public void close() {}
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 0a6d51eb4a..73730864d2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -103,6 +103,7 @@ public class ClusterScheduler implements IScheduler {
         if (result.getFailureStatus() != null) {
           stateMachine.transitionToFailed(result.getFailureStatus());
         } else {
+          // won't get into here
           stateMachine.transitionToFailed(
               new IllegalStateException("Fragment cannot be dispatched"));
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index d3f0d21e86..21a4436a6b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -46,16 +47,13 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.concurrent.SetThreadName;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -102,67 +100,45 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     return executor.submit(
         () -> {
           for (FragmentInstance instance : instances) {
-            boolean accepted = dispatchOneInstance(instance);
-            if (!accepted) {
-              return new FragInstanceDispatchResult(false);
+            try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+              dispatchOneInstance(instance);
+            } catch (FragmentInstanceDispatchException e) {
+              return new FragInstanceDispatchResult(e.getFailureStatus());
+            } catch (Throwable t) {
+              logger.error("cannot dispatch FI for read operation", t);
+              return new FragInstanceDispatchResult(
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: 
" + t.getMessage()));
             }
           }
           return new FragInstanceDispatchResult(true);
         });
   }
 
-  // TODO: (xingtanzjr) Return the detailed write states for each 
FragmentInstance
-  private Future<FragInstanceDispatchResult> 
dispatchWrite(List<FragmentInstance> instances) {
-    List<Future<Boolean>> futures = new LinkedList<>();
-    for (FragmentInstance instance : instances) {
-      futures.add(writeOperationExecutor.submit(() -> 
dispatchOneInstance(instance)));
-    }
-    SettableFuture<FragInstanceDispatchResult> resultFuture = 
SettableFuture.create();
-    for (Future<Boolean> future : futures) {
-      try {
-        Boolean success = future.get();
-        if (!success) {
-          resultFuture.set(new FragInstanceDispatchResult(false));
-          break;
-        }
-      } catch (ExecutionException | InterruptedException e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        resultFuture.setException(e);
-        break;
-      }
-    }
-    resultFuture.set(new FragInstanceDispatchResult(true));
-    return resultFuture;
-  }
-
   private Future<FragInstanceDispatchResult> 
dispatchWriteSync(List<FragmentInstance> instances) {
-    boolean result = true;
-    try {
-      for (FragmentInstance instance : instances) {
-
-        if (!dispatchOneInstance(instance)) {
-          result = false;
-          break;
-        }
+    for (FragmentInstance instance : instances) {
+      try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+        dispatchOneInstance(instance);
+      } catch (FragmentInstanceDispatchException e) {
+        return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
+      } catch (Throwable t) {
+        logger.error("cannot dispatch FI for write operation", t);
+        return immediateFuture(
+            new FragInstanceDispatchResult(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " 
+ t.getMessage())));
       }
-      return immediateFuture(new FragInstanceDispatchResult(result));
-    } catch (FragmentInstanceDispatchException e) {
-      logger.error("cannot dispatch FI for write operation", e);
-      return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
     }
+    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
-  private boolean dispatchOneInstance(FragmentInstance instance)
+  private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
-    try (SetThreadName fragmentInstanceName = new 
SetThreadName(instance.getId().getFullId())) {
-      TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
-      if (isDispatchedToLocal(endPoint)) {
-        return dispatchLocally(instance);
-      } else {
-        return dispatchRemote(instance, endPoint);
-      }
+    TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+    if (isDispatchedToLocal(endPoint)) {
+      dispatchLocally(instance);
+    } else {
+      dispatchRemote(instance, endPoint);
     }
   }
 
@@ -170,7 +146,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     return this.localhostIpAddr.equals(endPoint.getIp()) && 
localhostInternalPort == endPoint.port;
   }
 
-  private boolean dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+  private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
     try (SyncDataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
@@ -182,7 +158,11 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                   instance.getRegionReplicaSet().getRegionId());
           TSendFragmentInstanceResp sendFragmentInstanceResp =
               client.sendFragmentInstance(sendFragmentInstanceReq);
-          return sendFragmentInstanceResp.accepted;
+          if (!sendFragmentInstanceResp.accepted) {
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(
+                    TSStatusCode.EXECUTE_STATEMENT_ERROR, 
sendFragmentInstanceResp.message));
+          }
         case WRITE:
           TSendPlanNodeReq sendPlanNodeReq =
               new TSendPlanNodeReq(
@@ -193,36 +173,67 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
             logger.error(sendPlanNodeResp.getStatus().message);
             throw new 
FragmentInstanceDispatchException(sendPlanNodeResp.getStatus());
           }
-          return true;
       }
     } catch (IOException | TException e) {
       logger.error("can't connect to node {}", endPoint, e);
-      throw new FragmentInstanceDispatchException(e);
+      TSStatus status = new TSStatus();
+      status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode());
+      status.setMessage("can't connect to node {}" + endPoint);
+      throw new FragmentInstanceDispatchException(status);
     }
-    return false;
   }
 
-  private boolean dispatchLocally(FragmentInstance instance)
-      throws FragmentInstanceDispatchException {
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.createFromTConsensusGroupId(
-            instance.getRegionReplicaSet().getRegionId());
+  private void dispatchLocally(FragmentInstance instance) throws 
FragmentInstanceDispatchException {
+    // deserialize ConsensusGroupId
+    ConsensusGroupId groupId;
+    try {
+      groupId =
+          ConsensusGroupId.Factory.createFromTConsensusGroupId(
+              instance.getRegionReplicaSet().getRegionId());
+    } catch (Throwable t) {
+      logger.error("Deserialize ConsensusGroupId failed. ", t);
+      throw new FragmentInstanceDispatchException(
+          RpcUtils.getStatus(
+              TSStatusCode.EXECUTE_STATEMENT_ERROR,
+              "Deserialize ConsensusGroupId failed: " + t.getMessage()));
+    }
+
     switch (instance.getType()) {
       case READ:
+        // execute fragment instance in state machine
         ConsensusReadResponse readResponse;
-        if (groupId instanceof DataRegionId) {
-          readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
instance);
-        } else {
-          readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, 
instance);
+        try {
+          if (groupId instanceof DataRegionId) {
+            readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
instance);
+          } else {
+            readResponse = 
SchemaRegionConsensusImpl.getInstance().read(groupId, instance);
+          }
+        } catch (Throwable t) {
+          logger.error("Execute FragmentInstance in ConsensusGroup {} 
failed.", groupId, t);
+          throw new FragmentInstanceDispatchException(
+              RpcUtils.getStatus(
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                  "Execute FragmentInstance failed. " + t.getMessage()));
         }
         if (!readResponse.isSuccess()) {
           logger.error(
-              "dispatch FragmentInstance {} locally failed because {}",
+              "dispatch FragmentInstance {} locally failed. ",
               instance,
               readResponse.getException());
-          return false;
+          throw new FragmentInstanceDispatchException(
+              RpcUtils.getStatus(
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                  "Execute FragmentInstance failed: "
+                      + (readResponse.getException() == null
+                          ? ""
+                          : readResponse.getException().getMessage())));
+        } else {
+          FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
+          if (!info.getState().isFailed()) {
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
info.getMessage()));
+          }
         }
-        return !((FragmentInstanceInfo) 
readResponse.getDataset()).getState().isFailed();
       case WRITE:
         PlanNode planNode = instance.getFragment().getRoot();
         boolean hasFailedMeasurement = false;
@@ -258,11 +269,12 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
               RpcUtils.getStatus(
                   TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
         }
-
-        return true;
     }
-    throw new UnsupportedOperationException(
-        String.format("unknown query type [%s]", instance.getType()));
+
+    throw new FragmentInstanceDispatchException(
+        RpcUtils.getStatus(
+            TSStatusCode.INTERNAL_SERVER_ERROR,
+            String.format("unknown query type [%s]", instance.getType())));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java
index eaa55d1d3f..2fc7ad4976 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.iotdb.db.protocol.mpprest.handler;
 
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
@@ -47,7 +48,7 @@ public class QueryDataSetHandler {
    */
   public static Response fillQueryDataSet(
       IQueryExecution queryExecution, Statement statement, int 
actualRowSizeLimit)
-      throws IOException {
+      throws IOException, IoTDBException {
     if (statement instanceof ShowStatement) {
       return fillShowPlanDataSet(queryExecution, actualRowSizeLimit);
     } else if (statement instanceof QueryStatement) {
@@ -67,7 +68,8 @@ public class QueryDataSetHandler {
   }
 
   public static Response fillDataSetWithTimestamps(
-      IQueryExecution queryExecution, final int actualRowSizeLimit, final long 
timePrecision) {
+      IQueryExecution queryExecution, final int actualRowSizeLimit, final long 
timePrecision)
+      throws IoTDBException {
     org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet =
         new org.apache.iotdb.db.protocol.rest.model.QueryDataSet();
     DatasetHeader header = queryExecution.getDatasetHeader();
@@ -81,7 +83,7 @@ public class QueryDataSetHandler {
   }
 
   public static Response fillAggregationPlanDataSet(
-      IQueryExecution queryExecution, final int actualRowSizeLimit) {
+      IQueryExecution queryExecution, final int actualRowSizeLimit) throws 
IoTDBException {
 
     org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet =
         new org.apache.iotdb.db.protocol.rest.model.QueryDataSet();
@@ -97,7 +99,7 @@ public class QueryDataSetHandler {
   }
 
   private static Response fillShowPlanDataSet(
-      IQueryExecution queryExecution, final int actualRowSizeLimit) throws 
IOException {
+      IQueryExecution queryExecution, final int actualRowSizeLimit) throws 
IoTDBException {
     org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet =
         new org.apache.iotdb.db.protocol.rest.model.QueryDataSet();
     initTargetDatasetOrderByOrderWithSourceDataSet(
@@ -135,7 +137,8 @@ public class QueryDataSetHandler {
       IQueryExecution queryExecution,
       int actualRowSizeLimit,
       org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet,
-      final long timePrecision) {
+      final long timePrecision)
+      throws IoTDBException {
     int fetched = 0;
     int columnNum = queryExecution.getOutputValueColumnCount();
     while (fetched < actualRowSizeLimit) {
@@ -177,7 +180,8 @@ public class QueryDataSetHandler {
   private static Response fillQueryDataSetWithoutTimestamps(
       IQueryExecution queryExecution,
       int actualRowSizeLimit,
-      org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet) {
+      org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet)
+      throws IoTDBException {
     int fetched = 0;
     int columnNum = queryExecution.getOutputValueColumnCount();
     while (fetched < actualRowSizeLimit) {
@@ -210,7 +214,7 @@ public class QueryDataSetHandler {
   }
 
   public static Response fillGrafanaVariablesResult(
-      IQueryExecution queryExecution, Statement statement) {
+      IQueryExecution queryExecution, Statement statement) throws 
IoTDBException {
     List<String> results = new ArrayList<>();
     Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
     if (!optionalTsBlock.isPresent()) {
@@ -232,7 +236,8 @@ public class QueryDataSetHandler {
     return Response.ok().entity(results).build();
   }
 
-  public static Response fillGrafanaNodesResult(IQueryExecution 
queryExecution) throws IOException {
+  public static Response fillGrafanaNodesResult(IQueryExecution queryExecution)
+      throws IoTDBException {
     List<String> nodes = new ArrayList<>();
     Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
     if (!optionalTsBlock.isPresent()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 844ffa66ad..550ef32f60 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -545,7 +545,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       if (s == null) {
         return RpcUtils.getTSExecuteStatementResp(
             RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is 
not supported"));
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported"));
       }
       // permission check
       TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 16b04f4e7b..048d791e39 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
+import com.google.common.collect.ImmutableList;
 import io.airlift.concurrent.SetThreadName;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -118,26 +119,63 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TSendFragmentInstanceResp 
sendFragmentInstance(TSendFragmentInstanceReq req) {
     LOGGER.info("receive FragmentInstance to group[{}]", 
req.getConsensusGroupId());
-    ConsensusGroupId groupId =
-        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    ConsensusReadResponse readResponse;
+
+    // deserialize ConsensusGroupId
+    ConsensusGroupId groupId;
+    try {
+      groupId = 
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    } catch (Throwable t) {
+      LOGGER.error("Deserialize ConsensusGroupId failed. ", t);
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+      resp.setMessage("Deserialize ConsensusGroupId failed: " + 
t.getMessage());
+      return resp;
+    }
+
     // We deserialize here instead of the underlying state machine because 
parallelism is possible
     // here but not at the underlying state machine
-    FragmentInstance fragmentInstance = 
FragmentInstance.deserializeFrom(req.fragmentInstance.body);
-    if (groupId instanceof DataRegionId) {
-      readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
-    } else {
-      readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
+    FragmentInstance fragmentInstance;
+    try {
+      fragmentInstance = 
FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+    } catch (Throwable t) {
+      LOGGER.error("Deserialize FragmentInstance failed.", t);
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+      resp.setMessage("Deserialize FragmentInstance failed: " + 
t.getMessage());
+      return resp;
     }
-    if (!readResponse.isSuccess()) {
+
+    // execute fragment instance in state machine
+    ConsensusReadResponse readResponse;
+    try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
+      if (groupId instanceof DataRegionId) {
+        readResponse = DataRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
+      } else {
+        readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, 
fragmentInstance);
+      }
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
+      if (!readResponse.isSuccess()) {
+        LOGGER.error(
+            "Execute FragmentInstance in ConsensusGroup {} failed.",
+            req.getConsensusGroupId(),
+            readResponse.getException());
+        resp.setAccepted(false);
+        resp.setMessage(
+            "Execute FragmentInstance failed: "
+                + (readResponse.getException() == null
+                    ? ""
+                    : readResponse.getException().getMessage()));
+      } else {
+        FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
+        resp.setAccepted(!info.getState().isFailed());
+        resp.setMessage(info.getMessage());
+      }
+      return resp;
+    } catch (Throwable t) {
       LOGGER.error(
-          "execute FragmentInstance in ConsensusGroup {} failed because {}",
-          req.getConsensusGroupId(),
-          readResponse.getException());
-      return new TSendFragmentInstanceResp(false);
+          "Execute FragmentInstance in ConsensusGroup {} failed.", 
req.getConsensusGroupId(), t);
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+      resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+      return resp;
     }
-    FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
-    return new TSendFragmentInstanceResp(!info.getState().isFailed());
   }
 
   @Override
@@ -204,11 +242,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TFragmentInstanceStateResp 
fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
     FragmentInstanceId instanceId = 
FragmentInstanceId.fromThrift(req.fragmentInstanceId);
-    try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) 
{
-      FragmentInstanceInfo info = 
FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
-      return info != null
-          ? new TFragmentInstanceStateResp(info.getState().toString())
-          : new 
TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
+    FragmentInstanceInfo info = 
FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
+    if (info != null) {
+      TFragmentInstanceStateResp resp = new 
TFragmentInstanceStateResp(info.getState().toString());
+      resp.setFailedMessages(ImmutableList.of(info.getMessage()));
+      return resp;
+    } else {
+      return new 
TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 3b8d2bd608..f7e9640455 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
@@ -178,7 +179,7 @@ public class QueryDataSetUtils {
   }
 
   public static TSQueryDataSet convertTsBlockByFetchSize(
-      IQueryExecution queryExecution, int fetchSize) throws IOException {
+      IQueryExecution queryExecution, int fetchSize) throws IOException, 
IoTDBException {
     int columnNum = queryExecution.getOutputValueColumnCount();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
     // one time column and each value column has an actual value buffer and a 
bitmap value to
diff --git a/thrift/src/main/thrift/datanode.thrift 
b/thrift/src/main/thrift/datanode.thrift
index 3ceeefb0c4..b0371aafc6 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -116,6 +116,7 @@ struct TFetchFragmentInstanceStateReq {
 // TODO: need to supply more fields according to implementation
 struct TFragmentInstanceStateResp {
   1: required string state
+  2: optional list<string> failedMessages
 }
 
 struct TCancelQueryReq {

Reply via email to