This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 571dcbe19c9 [RTO/RPO] Coordinator/Session Failover Retry (#15269)
571dcbe19c9 is described below

commit 571dcbe19c9ab16a53a5e6b1d4239cf1a9daf32d
Author: William Song <[email protected]>
AuthorDate: Tue Apr 22 14:33:03 2025 +0800

    [RTO/RPO] Coordinator/Session Failover Retry (#15269)
    
    * rto/rpo retry
    
    * enable restart it
    
    * add apache license
    
    * add apache license
    
    * add apache license
    
    * re run CI
    
    * add apache license
    
    * add retry in jdbc
    
    * remove reconnect on jdbc
    
    * Debug
    
    * do value copy
    
    * add license
    
    * spotless
    
    * Debug2
    
    * add reconnect
    
    * partial
    
    * Update MeasurementGroup.java
    
    * Update AutoCreateSchemaExecutor.java
    
    * Update MeasurementGroup.java
    
    * Update MeasurementGroup.java
    
    * maybe final
    
    * Debug logger removal
    
    * partial
    
    * Removed useless
    
    * add and retry
    
    ---------
    
    Co-authored-by: Caideyipi <[email protected]>
---
 .../org/apache/iotdb/db/it/IoTDBRestartIT.java     |   2 -
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |   8 -
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 134 ++++++-----
 .../apache/iotdb/session/SessionConnection.java    | 148 +++++++++---
 .../iotdb/db/queryengine/plan/Coordinator.java     |   4 -
 .../analyze/schema/AutoCreateSchemaExecutor.java   |  14 +-
 .../analyze/schema/ClusterSchemaFetchExecutor.java |   9 +-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   1 -
 .../queryengine/plan/planner/TreeModelPlanner.java |   2 -
 .../plan/node/metadata/write/MeasurementGroup.java |  25 ++-
 .../plan/relational/planner/TableModelPlanner.java |   9 -
 .../plan/scheduler/AsyncPlanNodeSender.java        |  21 +-
 .../plan/scheduler/ClusterScheduler.java           |   5 -
 .../FailedFragmentInstanceWithStatus.java          |  41 ++++
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 248 ++++++++++++---------
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |   4 +-
 .../exception/QuerySchemaFetchFailedException.java |  30 +++
 .../apache/iotdb/commons/utils/StatusUtils.java    |   1 -
 18 files changed, 461 insertions(+), 245 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
index 603196e871a..14d298e31bd 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -47,7 +46,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
-@Ignore("enable this after RTO/RPO retry")
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBRestartIT {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index fef5dc2e469..2a64278b1e8 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -20,14 +20,12 @@
 package org.apache.iotdb.db.it.utils;
 
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.itbase.env.BaseEnv;
-import org.apache.iotdb.itbase.env.BaseNodeWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -45,7 +43,6 @@ import java.sql.Statement;
 import java.text.DateFormat;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1709,11 +1706,6 @@ public class TestUtils {
     long retryIntervalMS = 1000;
     while (true) {
       try (Connection connection = EnvFactory.getEnv().getConnection()) {
-        final List<BaseNodeWrapper> allDataNodes =
-            new ArrayList<>(EnvFactory.getEnv().getDataNodeWrapperList());
-        EnvFactory.getEnv()
-            .ensureNodeStatus(
-                allDataNodes, Collections.nCopies(allDataNodes.size(), 
NodeStatus.Running));
         break;
       } catch (Exception e) {
         try {
diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index c36d905dd90..820f8e19337 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -45,6 +45,8 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static org.apache.iotdb.jdbc.Constant.TABLE;
 import static org.apache.iotdb.jdbc.Constant.TREE;
@@ -274,18 +276,10 @@ public class IoTDBStatement implements Statement {
     try {
       return executeSQL(sql);
     } catch (TException e) {
-      if (reConnect()) {
-        try {
-          return executeSQL(sql);
-        } catch (TException e2) {
-          throw new SQLException(e2);
-        }
-      } else {
-        throw new SQLException(
-            String.format(
-                "Fail to reconnect to server when executing %s. please check 
server status", sql),
-            e);
-      }
+      throw new SQLException(
+          String.format(
+              "Fail to reconnect to server when executing %s. please check 
server status", sql),
+          e);
     }
   }
 
@@ -304,6 +298,54 @@ public class IoTDBStatement implements Statement {
     throw new SQLException(NOT_SUPPORT_EXECUTE);
   }
 
+  private interface TFunction<T> {
+    T run() throws TException;
+  }
+
+  private <T> T callWithRetryAndReconnect(TFunction<T> rpc, Function<T, 
TSStatus> statusGetter)
+      throws SQLException, TException {
+    TException lastTException = null;
+    T result = null;
+    int retryAttempt;
+    int maxRetryCount = 5;
+    int retryIntervalInMs = 1000;
+    for (retryAttempt = 0; retryAttempt <= maxRetryCount; retryAttempt++) {
+      // 1. try to execute the rpc
+      try {
+        result = rpc.run();
+        lastTException = null;
+      } catch (TException e) {
+        result = null;
+        lastTException = e;
+      }
+
+      TSStatus status = null;
+      if (result != null) {
+        status = statusGetter.apply(result);
+      }
+      // success, return immediately
+      if (status != null && !(status.isSetNeedRetry() && 
status.isNeedRetry())) {
+        return result;
+      }
+
+      // prepare for the next retry
+      if (lastTException != null) {
+        reConnect();
+      }
+      try {
+        TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+
+    if (result == null && lastTException != null) {
+      throw lastTException;
+    }
+    return result;
+  }
+
   /**
    * There are two kinds of sql here: (1) query sql (2) update sql.
    *
@@ -318,7 +360,9 @@ public class IoTDBStatement implements Statement {
     }
     execReq.setFetchSize(rows);
     execReq.setTimeout((long) queryTimeout * 1000);
-    TSExecuteStatementResp execResp = client.executeStatementV2(execReq);
+    TSExecuteStatementResp execResp =
+        callWithRetryAndReconnect(
+            () -> client.executeStatementV2(execReq), 
TSExecuteStatementResp::getStatus);
     try {
       RpcUtils.verifySuccess(execResp.getStatus());
     } catch (StatementExecutionException e) {
@@ -370,26 +414,18 @@ public class IoTDBStatement implements Statement {
     try {
       return executeBatchSQL();
     } catch (TException e) {
-      if (reConnect()) {
-        try {
-          return executeBatchSQL();
-        } catch (TException e2) {
-          throw new SQLException(
-              "Fail to execute batch sqls after reconnecting. please check 
server status", e2);
-        }
-      } else {
-        throw new SQLException(
-            "Fail to reconnect to server when executing batch sqls. please 
check server status", e);
-      }
+      throw new SQLException(
+          "Fail to reconnect to server when executing batch sqls. please check 
server status", e);
     } finally {
       clearBatch();
     }
   }
 
-  private int[] executeBatchSQL() throws TException, BatchUpdateException {
+  private int[] executeBatchSQL() throws TException, BatchUpdateException, 
SQLException {
     isCancelled = false;
     TSExecuteBatchStatementReq execReq = new 
TSExecuteBatchStatementReq(sessionId, batchSQLList);
-    TSStatus execResp = client.executeBatchStatement(execReq);
+    TSStatus execResp =
+        callWithRetryAndReconnect(() -> client.executeBatchStatement(execReq), 
status -> status);
     int[] result = new int[batchSQLList.size()];
     boolean allSuccess = true;
     StringBuilder message = new StringBuilder(System.lineSeparator());
@@ -444,20 +480,9 @@ public class IoTDBStatement implements Statement {
     try {
       return executeQuerySQL(sql, timeoutInMS);
     } catch (TException e) {
-      if (reConnect()) {
-        try {
-          return executeQuerySQL(sql, timeoutInMS);
-        } catch (TException e2) {
-          throw new SQLException(
-              "Fail to executeQuery " + sql + "after reconnecting. please 
check server status", e2);
-        }
-      } else {
-        throw new SQLException(
-            "Fail to reconnect to server when execute query "
-                + sql
-                + ". please check server status",
-            e);
-      }
+      throw new SQLException(
+          "Fail to reconnect to server when execute query " + sql + ". please 
check server status",
+          e);
     }
   }
 
@@ -471,7 +496,9 @@ public class IoTDBStatement implements Statement {
     execReq.setFetchSize(rows);
     execReq.setTimeout(timeoutInMS);
     execReq.setJdbcQuery(true);
-    TSExecuteStatementResp execResp = client.executeQueryStatementV2(execReq);
+    TSExecuteStatementResp execResp =
+        callWithRetryAndReconnect(
+            () -> client.executeQueryStatementV2(execReq), 
TSExecuteStatementResp::getStatus);
     queryId = execResp.getQueryId();
     try {
       RpcUtils.verifySuccess(execResp.getStatus());
@@ -520,21 +547,9 @@ public class IoTDBStatement implements Statement {
     try {
       return executeUpdateSQL(sql);
     } catch (TException e) {
-      if (reConnect()) {
-        try {
-          return executeUpdateSQL(sql);
-        } catch (TException e2) {
-          throw new SQLException(
-              "Fail to execute update " + sql + "after reconnecting. please 
check server status",
-              e2);
-        }
-      } else {
-        throw new SQLException(
-            "Fail to reconnect to server when execute update "
-                + sql
-                + ". please check server status",
-            e);
-      }
+      throw new SQLException(
+          "Fail to reconnect to server when execute update " + sql + ". please 
check server status",
+          e);
     }
   }
 
@@ -553,9 +568,12 @@ public class IoTDBStatement implements Statement {
     throw new SQLException(NOT_SUPPORT_EXECUTE_UPDATE);
   }
 
-  private int executeUpdateSQL(final String sql) throws TException, 
IoTDBSQLException {
+  private int executeUpdateSQL(final String sql)
+      throws TException, IoTDBSQLException, SQLException {
     final TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, 
sql, stmtId);
-    final TSExecuteStatementResp execResp = 
client.executeUpdateStatement(execReq);
+    final TSExecuteStatementResp execResp =
+        callWithRetryAndReconnect(
+            () -> client.executeUpdateStatement(execReq), 
TSExecuteStatementResp::getStatus);
     if (execResp.isSetQueryId()) {
       queryId = execResp.getQueryId();
     }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 9dbbbc343e2..58d46d4b5a3 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
 import 
org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
@@ -83,6 +84,8 @@ import java.util.List;
 import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static org.apache.iotdb.session.Session.TABLE;
@@ -288,7 +291,7 @@ public class SessionConnection {
   protected void setTimeZone(String zoneId)
       throws StatementExecutionException, IoTDBConnectionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, 
zoneId);
                   return client.setTimeZone(req);
@@ -312,21 +315,23 @@ public class SessionConnection {
   protected void setStorageGroup(String storageGroup)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(() -> client.setStorageGroup(sessionId, 
storageGroup)).getResult();
+        callWithRetryAndReconnect(() -> client.setStorageGroup(sessionId, 
storageGroup))
+            .getResult();
     RpcUtils.verifySuccess(status);
   }
 
   protected void deleteStorageGroups(List<String> storageGroups)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(() -> client.deleteStorageGroups(sessionId, 
storageGroups)).getResult();
+        callWithRetryAndReconnect(() -> client.deleteStorageGroups(sessionId, 
storageGroups))
+            .getResult();
     RpcUtils.verifySuccess(status);
   }
 
   protected void createTimeseries(TSCreateTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.createTimeseries(request);
@@ -338,7 +343,7 @@ public class SessionConnection {
   protected void createAlignedTimeseries(TSCreateAlignedTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.createAlignedTimeseries(request);
@@ -350,7 +355,7 @@ public class SessionConnection {
   protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.createMultiTimeseries(request);
@@ -384,12 +389,13 @@ public class SessionConnection {
     execReq.setEnableRedirectQuery(enableRedirect);
 
     RetryResult<TSExecuteStatementResp> result =
-        callWithReconnect(
+        callWithRetryAndReconnect(
             () -> {
               execReq.setSessionId(sessionId);
               execReq.setStatementId(statementId);
               return client.executeQueryStatementV2(execReq);
-            });
+            },
+            TSExecuteStatementResp::getStatus);
     TSExecuteStatementResp execResp = result.getResult();
     if (result.getRetryAttempts() == 0) {
       RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
@@ -453,12 +459,13 @@ public class SessionConnection {
     execReq.setEnableRedirectQuery(enableRedirect);
 
     RetryResult<TSExecuteStatementResp> result =
-        callWithReconnect(
+        callWithRetryAndReconnect(
             () -> {
               execReq.setSessionId(sessionId);
               execReq.setStatementId(statementId);
               return client.executeRawDataQueryV2(execReq);
-            });
+            },
+            TSExecuteStatementResp::getStatus);
 
     TSExecuteStatementResp execResp = result.getResult();
     if (result.getRetryAttempts() == 0) {
@@ -497,12 +504,13 @@ public class SessionConnection {
     TEndPoint redirectedEndPoint = null;
 
     RetryResult<TSExecuteStatementResp> result =
-        callWithReconnect(
+        callWithRetryAndReconnect(
             () -> {
               req.setSessionId(sessionId);
               req.setStatementId(statementId);
               return client.executeFastLastDataQueryForOneDeviceV2(req);
-            });
+            },
+            TSExecuteStatementResp::getStatus);
 
     TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
     if (result.getRetryAttempts() == 0) {
@@ -544,12 +552,13 @@ public class SessionConnection {
     tsLastDataQueryReq.setTimeout(timeOut);
 
     RetryResult<TSExecuteStatementResp> result =
-        callWithReconnect(
+        callWithRetryAndReconnect(
             () -> {
               tsLastDataQueryReq.setSessionId(sessionId);
               tsLastDataQueryReq.setStatementId(statementId);
               return client.executeLastDataQueryV2(tsLastDataQueryReq);
-            });
+            },
+            TSExecuteStatementResp::getStatus);
     final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
 
     if (result.getRetryAttempts() == 0) {
@@ -625,12 +634,13 @@ public class SessionConnection {
   private SessionDataSet executeAggregationQuery(TSAggregationQueryReq 
tsAggregationQueryReq)
       throws StatementExecutionException, IoTDBConnectionException, 
RedirectException {
     RetryResult<TSExecuteStatementResp> result =
-        callWithReconnect(
+        callWithRetryAndReconnect(
             () -> {
               tsAggregationQueryReq.setSessionId(sessionId);
               tsAggregationQueryReq.setStatementId(statementId);
               return client.executeAggregationQueryV2(tsAggregationQueryReq);
-            });
+            },
+            TSExecuteStatementResp::getStatus);
 
     TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
     if (result.getRetryAttempts() == 0) {
@@ -852,6 +862,73 @@ public class SessionConnection {
     return new RetryResult<>(status, lastTException, i);
   }
 
+  private RetryResult<TSStatus> callWithRetryAndReconnect(TFunction<TSStatus> 
rpc) {
+    return callWithRetryAndReconnect(
+        rpc,
+        status -> status.isSetNeedRetry() && status.isNeedRetry(),
+        status -> status.getCode() == 
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
+  }
+
+  private <T> RetryResult<T> callWithRetryAndReconnect(
+      TFunction<T> rpc, Function<T, TSStatus> statusGetter) {
+    return callWithRetryAndReconnect(
+        rpc,
+        t -> {
+          final TSStatus status = statusGetter.apply(t);
+          return status.isSetNeedRetry() && status.isNeedRetry();
+        },
+        t ->
+            statusGetter.apply(t).getCode()
+                == TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
+  }
+
+  /** reconnect if the remote datanode is unreachable retry if the status is 
set to needRetry */
+  private <T> RetryResult<T> callWithRetryAndReconnect(
+      TFunction<T> rpc, Predicate<T> shouldRetry, Predicate<T> forceReconnect) 
{
+    TException lastTException = null;
+    T result = null;
+    int retryAttempt;
+    int maxRetryCountRead = 10;
+    for (retryAttempt = 0; retryAttempt <= maxRetryCountRead; retryAttempt++) {
+      // 1. try to execute the rpc
+      try {
+        result = rpc.run();
+        lastTException = null;
+      } catch (TException e) {
+        result = null;
+        lastTException = e;
+      }
+
+      // success, return immediately
+      if (result != null && !shouldRetry.test(result)) {
+        return new RetryResult<>(result, null, retryAttempt);
+      }
+
+      // prepare for the next retry
+      if (lastTException != null
+          || !availableNodes.get().contains(this.endPoint)
+          || (result != null && forceReconnect.test(result))) {
+        // 1. the current datanode is unreachable (TException)
+        // 2. the current datanode is partitioned with other nodes (not in 
availableNodes)
+        // 3. asymmetric network partition
+        reconnect();
+      }
+      try {
+        TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.warn(
+            "Thread {} was interrupted during retry {} with wait time {} ms. 
Exiting retry loop.",
+            Thread.currentThread().getName(),
+            retryAttempt,
+            retryIntervalInMs);
+        break;
+      }
+    }
+
+    return new RetryResult<>(result, lastTException, retryAttempt);
+  }
+
   private TSStatus deleteDataInternal(TSDeleteDataReq request) throws 
TException {
     request.setSessionId(sessionId);
     return client.deleteData(request);
@@ -860,7 +937,7 @@ public class SessionConnection {
   protected void testInsertRecord(TSInsertStringRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.testInsertStringRecord(request);
@@ -872,7 +949,7 @@ public class SessionConnection {
   protected void testInsertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.testInsertRecord(request);
@@ -884,7 +961,7 @@ public class SessionConnection {
   public void testInsertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.testInsertStringRecords(request);
@@ -896,7 +973,7 @@ public class SessionConnection {
   public void testInsertRecords(TSInsertRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.testInsertRecords(request);
@@ -908,7 +985,7 @@ public class SessionConnection {
   protected void testInsertTablet(TSInsertTabletReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.testInsertTablet(request);
@@ -920,7 +997,7 @@ public class SessionConnection {
   protected void testInsertTablets(TSInsertTabletsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.testInsertTablets(request);
@@ -980,7 +1057,7 @@ public class SessionConnection {
   protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.createSchemaTemplate(request);
@@ -992,7 +1069,7 @@ public class SessionConnection {
   protected void appendSchemaTemplate(TSAppendSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.appendSchemaTemplate(request);
@@ -1004,7 +1081,7 @@ public class SessionConnection {
   protected void pruneSchemaTemplate(TSPruneSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.pruneSchemaTemplate(request);
@@ -1016,11 +1093,12 @@ public class SessionConnection {
   protected TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req)
       throws StatementExecutionException, IoTDBConnectionException {
     final TSQueryTemplateResp execResp =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   req.setSessionId(sessionId);
                   return client.querySchemaTemplate(req);
-                })
+                },
+                TSQueryTemplateResp::getStatus)
             .getResult();
     RpcUtils.verifySuccess(execResp.getStatus());
     return execResp;
@@ -1029,7 +1107,7 @@ public class SessionConnection {
   protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.setSchemaTemplate(request);
@@ -1041,7 +1119,7 @@ public class SessionConnection {
   protected void unsetSchemaTemplate(TSUnsetSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.unsetSchemaTemplate(request);
@@ -1053,7 +1131,7 @@ public class SessionConnection {
   protected void dropSchemaTemplate(TSDropSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.dropSchemaTemplate(request);
@@ -1066,7 +1144,7 @@ public class SessionConnection {
       TCreateTimeseriesUsingSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     final TSStatus status =
-        callWithReconnect(
+        callWithRetryAndReconnect(
                 () -> {
                   request.setSessionId(sessionId);
                   return client.createTimeseriesUsingSchemaTemplate(request);
@@ -1078,7 +1156,9 @@ public class SessionConnection {
   protected TSBackupConfigurationResp getBackupConfiguration()
       throws IoTDBConnectionException, StatementExecutionException {
     final TSBackupConfigurationResp execResp =
-        callWithReconnect(() -> client.getBackupConfiguration()).getResult();
+        callWithRetryAndReconnect(
+                () -> client.getBackupConfiguration(), 
TSBackupConfigurationResp::getStatus)
+            .getResult();
     RpcUtils.verifySuccess(execResp.getStatus());
     return execResp;
   }
@@ -1104,7 +1184,9 @@ public class SessionConnection {
   }
 
   public TSConnectionInfoResp fetchAllConnections() throws 
IoTDBConnectionException {
-    return callWithReconnect(() -> 
client.fetchAllConnectionsInfo()).getResult();
+    return callWithRetryAndReconnect(
+            () -> client.fetchAllConnectionsInfo(), resp -> false, resp -> 
false)
+        .getResult();
   }
 
   public boolean isEnableRedirect() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 39f07c4c62d..404ed022966 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -375,8 +375,6 @@ public class Coordinator {
             statement.toRelationalStatement(queryContext),
             sqlParser,
             metadata,
-            executor,
-            writeOperationExecutor,
             scheduledExecutor,
             SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
             ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
@@ -463,8 +461,6 @@ public class Coordinator {
             statement,
             sqlParser,
             metadata,
-            executor,
-            writeOperationExecutor,
             scheduledExecutor,
             SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
             ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 76f88bb18ca..80b17a2a45c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -590,9 +590,19 @@ class AutoCreateSchemaExecutor {
       Map<PartialPath, Pair<Boolean, MeasurementGroup>> 
devicesNeedAutoCreateTimeSeries,
       MPPQueryContext context) {
 
-    List<MeasurementPath> measurementPathList =
+    // Deep copy to avoid changes to the original map
+    final List<MeasurementPath> measurementPathList =
         executeInternalCreateTimeseriesStatement(
-            new 
InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries), 
context);
+            new InternalCreateMultiTimeSeriesStatement(
+                devicesNeedAutoCreateTimeSeries.entrySet().stream()
+                    .collect(
+                        Collectors.toMap(
+                            Map.Entry::getKey,
+                            entry ->
+                                new Pair<>(
+                                    entry.getValue().getLeft(),
+                                    entry.getValue().getRight().deepCopy())))),
+            context);
 
     schemaTree.appendMeasurementPaths(measurementPathList);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 16f5420413e..2153a0ccf4b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.queryengine.plan.analyze.schema;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.QuerySchemaFetchFailedException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
@@ -251,11 +251,11 @@ class ClusterSchemaFetchExecutor {
     try {
       ExecutionResult executionResult = executionStatement(queryId, 
fetchStatement, context);
       if (executionResult.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new IoTDBRuntimeException(
+        throw new QuerySchemaFetchFailedException(
             String.format("Fetch Schema failed, because %s", 
executionResult.status.getMessage()),
             executionResult.status.getCode());
       }
-      try (SetThreadName threadName = new 
SetThreadName(executionResult.queryId.getId())) {
+      try (SetThreadName ignored = new 
SetThreadName(executionResult.queryId.getId())) {
         ClusterSchemaTree result = new ClusterSchemaTree();
         Set<String> databaseSet = new HashSet<>();
         while (coordinator.getQueryExecution(queryId).hasNextResult()) {
@@ -266,7 +266,8 @@ class ClusterSchemaFetchExecutor {
             tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
           } catch (IoTDBException e) {
             t = e;
-            throw new RuntimeException("Fetch Schema failed. ", e);
+            throw new QuerySchemaFetchFailedException(
+                String.format("Fetch Schema failed: %s", e.getMessage()), 
e.getErrorCode());
           }
           if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
             break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
index 3008e03237e..536a06bb09c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -53,7 +53,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 public class ClusterSchemaFetcher implements ISchemaFetcher {
-
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private final Coordinator coordinator = Coordinator.getInstance();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 88bdeb87a38..eaaacad64ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -148,8 +148,6 @@ public class TreeModelPlanner implements IPlanner {
               stateMachine,
               distributedPlan.getInstances(),
               context.getQueryType(),
-              executor,
-              writeOperationExecutor,
               scheduledExecutor,
               syncInternalServiceClientManager,
               asyncInternalServiceClientManager);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
index bce60638e1e..8667f6e21e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
@@ -45,7 +45,7 @@ public class MeasurementGroup {
   private List<Map<String, String>> tagsList;
   private List<Map<String, String>> attributesList;
 
-  private final transient Set<String> measurementSet = new HashSet<>();
+  private transient Set<String> measurementSet = new HashSet<>();
 
   public List<String> getMeasurements() {
     return measurements;
@@ -394,6 +394,29 @@ public class MeasurementGroup {
     }
   }
 
+  // This won't be affected by "removeMeasurements"
+  public MeasurementGroup deepCopy() {
+    final MeasurementGroup result = new MeasurementGroup();
+    result.measurements =
+        Objects.nonNull(this.measurements) ? new 
ArrayList<>(this.measurements) : null;
+    result.dataTypes = Objects.nonNull(this.dataTypes) ? new 
ArrayList<>(this.dataTypes) : null;
+    result.encodings = Objects.nonNull(this.encodings) ? new 
ArrayList<>(this.encodings) : null;
+    result.compressors =
+        Objects.nonNull(this.compressors) ? new ArrayList<>(this.compressors) 
: null;
+    result.aliasList = Objects.nonNull(this.aliasList) ? new 
ArrayList<>(this.aliasList) : null;
+    result.propsList = Objects.nonNull(this.propsList) ? new 
ArrayList<>(this.propsList) : null;
+    result.tagsList = Objects.nonNull(this.tagsList) ? new 
ArrayList<>(this.tagsList) : null;
+    result.attributesList =
+        Objects.nonNull(this.attributesList) ? new 
ArrayList<>(this.attributesList) : null;
+    result.measurementSet = new HashSet<>(measurementSet);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.nonNull(measurements) ? measurements.toString() : "null";
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index f8d39ddccf0..a5765ee538c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -58,7 +58,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
@@ -78,8 +77,6 @@ public class TableModelPlanner implements IPlanner {
 
   private final WarningCollector warningCollector = WarningCollector.NOOP;
 
-  private final ExecutorService executor;
-  private final ExecutorService writeOperationExecutor;
   private final ScheduledExecutorService scheduledExecutor;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
@@ -94,8 +91,6 @@ public class TableModelPlanner implements IPlanner {
       final Statement statement,
       final SqlParser sqlParser,
       final Metadata metadata,
-      final ExecutorService executor,
-      final ExecutorService writeOperationExecutor,
       final ScheduledExecutorService scheduledExecutor,
       final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
           syncInternalServiceClientManager,
@@ -109,8 +104,6 @@ public class TableModelPlanner implements IPlanner {
     this.statement = statement;
     this.sqlParser = sqlParser;
     this.metadata = metadata;
-    this.executor = executor;
-    this.writeOperationExecutor = writeOperationExecutor;
     this.scheduledExecutor = scheduledExecutor;
     this.syncInternalServiceClientManager = syncInternalServiceClientManager;
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
@@ -197,8 +190,6 @@ public class TableModelPlanner implements IPlanner {
               stateMachine,
               distributedPlan.getInstances(),
               context.getQueryType(),
-              executor,
-              writeOperationExecutor,
               scheduledExecutor,
               syncInternalServiceClientManager,
               asyncInternalServiceClientManager);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index 230eb27941e..fc1e3d049d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -109,19 +109,24 @@ public class AsyncPlanNodeSender {
     }
   }
 
-  public List<TSStatus> getFailureStatusList() {
-    List<TSStatus> failureStatusList = new ArrayList<>();
+  public List<FailedFragmentInstanceWithStatus> 
getFailedInstancesWithStatuses() {
+    List<FailedFragmentInstanceWithStatus> 
failureFragmentInstanceWithStatusList =
+        new ArrayList<>();
     TSStatus status;
     for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry : 
instanceId2RespMap.entrySet()) {
       status = entry.getValue().getStatus();
+      final FragmentInstance instance = instances.get(entry.getKey());
       if (!entry.getValue().accepted) {
         if (status == null) {
           LOGGER.warn(
               "dispatch write failed. message: {}, node {}",
               entry.getValue().message,
               
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
-          failureStatusList.add(
-              RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, 
entry.getValue().getMessage()));
+          failureFragmentInstanceWithStatusList.add(
+              new FailedFragmentInstanceWithStatus(
+                  instance,
+                  RpcUtils.getStatus(
+                      TSStatusCode.WRITE_PROCESS_ERROR, 
entry.getValue().getMessage())));
         } else {
           LOGGER.warn(
               "dispatch write failed. status: {}, code: {}, message: {}, node 
{}",
@@ -129,16 +134,18 @@ public class AsyncPlanNodeSender {
               TSStatusCode.representOf(status.code),
               entry.getValue().message,
               
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
-          failureStatusList.add(status);
+          failureFragmentInstanceWithStatusList.add(
+              new FailedFragmentInstanceWithStatus(instance, status));
         }
       } else {
         // some expected and accepted status except SUCCESS_STATUS need to be 
returned
         if (status != null && status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          failureStatusList.add(status);
+          failureFragmentInstanceWithStatusList.add(
+              new FailedFragmentInstanceWithStatus(instance, status));
         }
       }
     }
-    return failureStatusList;
+    return failureFragmentInstanceWithStatusList;
   }
 
   public boolean needRetry() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index b3e9acdc00d..b74dba62c15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -71,8 +70,6 @@ public class ClusterScheduler implements IScheduler {
       QueryStateMachine stateMachine,
       List<FragmentInstance> instances,
       QueryType queryType,
-      ExecutorService executor,
-      ExecutorService writeOperationExecutor,
       ScheduledExecutorService scheduledExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncInternalServiceClientManager,
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -84,8 +81,6 @@ public class ClusterScheduler implements IScheduler {
         new FragmentInstanceDispatcherImpl(
             queryType,
             queryContext,
-            executor,
-            writeOperationExecutor,
             syncInternalServiceClientManager,
             asyncInternalServiceClientManager);
     if (queryType == QueryType.READ) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FailedFragmentInstanceWithStatus.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FailedFragmentInstanceWithStatus.java
new file mode 100644
index 00000000000..8da3bfd2d9e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FailedFragmentInstanceWithStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.scheduler;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+
+public class FailedFragmentInstanceWithStatus {
+  private final FragmentInstance instance;
+  private final TSStatus failureStatus;
+
+  public FailedFragmentInstanceWithStatus(FragmentInstance instance, TSStatus 
failureStatus) {
+    this.instance = instance;
+    this.failureStatus = failureStatus;
+  }
+
+  public FragmentInstance getInstance() {
+    return instance;
+  }
+
+  public TSStatus getFailureStatus() {
+    return failureStatus;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 5aef2e3176c..111a40470ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -56,6 +57,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
+import org.apache.tsfile.utils.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,9 +65,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
+import java.util.Optional;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -76,9 +79,6 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
-
-  private final ExecutorService executor;
-  private final ExecutorService writeOperationExecutor;
   private final QueryType type;
   private final MPPQueryContext queryContext;
   private final String localhostIpAddr;
@@ -97,22 +97,24 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
 
   private static final String UNEXPECTED_ERRORS = "Unexpected errors: ";
 
+  private final long maxRetryDurationInNs;
+
   public FragmentInstanceDispatcherImpl(
       QueryType type,
       MPPQueryContext queryContext,
-      ExecutorService executor,
-      ExecutorService writeOperationExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncInternalServiceClientManager,
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
           asyncInternalServiceClientManager) {
     this.type = type;
     this.queryContext = queryContext;
-    this.executor = executor;
-    this.writeOperationExecutor = writeOperationExecutor;
     this.syncInternalServiceClientManager = syncInternalServiceClientManager;
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
     this.localhostIpAddr = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
     this.localhostInternalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+    this.maxRetryDurationInNs =
+        COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
+            ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
+            : 0;
   }
 
   @Override
@@ -120,7 +122,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     if (type == QueryType.READ) {
       return dispatchRead(instances);
     } else {
-      return dispatchWriteAsync(instances);
+      return dispatchWrite(instances);
     }
   }
 
@@ -163,138 +165,170 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
-  private Future<FragInstanceDispatchResult> 
dispatchWriteSync(List<FragmentInstance> instances) {
-    List<TSStatus> failureStatusList = new ArrayList<>();
-    for (FragmentInstance instance : instances) {
-      try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
-        dispatchOneInstance(instance);
-      } catch (FragmentInstanceDispatchException e) {
-        TSStatus failureStatus = e.getFailureStatus();
-        if (instances.size() == 1) {
-          failureStatusList.add(failureStatus);
-        } else {
-          if (failureStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-            failureStatusList.addAll(failureStatus.getSubStatus());
-          } else {
-            failureStatusList.add(failureStatus);
-          }
-        }
-      } catch (Throwable t) {
-        LOGGER.warn(DISPATCH_FAILED, t);
-        failureStatusList.add(
-            RpcUtils.getStatus(
-                TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage()));
+  /** Entrypoint for dispatching write fragment instances. */
+  private Future<FragInstanceDispatchResult> 
dispatchWrite(List<FragmentInstance> instances) {
+    final List<TSStatus> dispatchFailures = new ArrayList<>();
+    int replicaNum = 0;
+
+    // 1. do not dispatch if the RegionReplicaSet is empty
+    final List<FragmentInstance> shouldDispatch = new ArrayList<>();
+    for (final FragmentInstance instance : instances) {
+      if (instance.getHostDataNode() == null
+          || Optional.ofNullable(instance.getRegionReplicaSet())
+                  .map(TRegionReplicaSet::getDataNodeLocationsSize)
+                  .orElse(0)
+              == 0) {
+        dispatchFailures.add(
+            new 
TSStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()));
+      } else {
+        replicaNum =
+            Math.max(replicaNum, 
instance.getRegionReplicaSet().getDataNodeLocationsSize());
+        shouldDispatch.add(instance);
       }
     }
-    if (failureStatusList.isEmpty()) {
+
+    try {
+      // 2. try the dispatch
+      final List<FailedFragmentInstanceWithStatus> failedInstances =
+          dispatchWriteOnce(shouldDispatch);
+
+      // 3. decide if we need retry (we may decide the retry condition 
instance-wise, if needed)
+      final boolean shouldRetry =
+          !failedInstances.isEmpty() && maxRetryDurationInNs > 0 && replicaNum 
> 1;
+      if (!shouldRetry) {
+        failedInstances.forEach(fi -> 
dispatchFailures.add(fi.getFailureStatus()));
+      } else {
+        // 4. retry the instance on other replicas
+        final List<FragmentInstance> retryInstances =
+            failedInstances.stream()
+                .map(FailedFragmentInstanceWithStatus::getInstance)
+                .collect(Collectors.toList());
+        // here we only retry over each replica once
+        final List<FailedFragmentInstanceWithStatus> failedAfterRetry =
+            dispatchRetryWrite(retryInstances, replicaNum);
+        failedAfterRetry.forEach(fi -> 
dispatchFailures.add(fi.getFailureStatus()));
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.error("Interrupted when dispatching write async", e);
+      return immediateFuture(
+          new FragInstanceDispatchResult(
+              RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + 
e.getMessage())));
+    }
+
+    if (dispatchFailures.isEmpty()) {
       return immediateFuture(new FragInstanceDispatchResult(true));
+    }
+    if (instances.size() == 1) {
+      return immediateFuture(new 
FragInstanceDispatchResult(dispatchFailures.get(0)));
     } else {
-      if (instances.size() == 1) {
-        return immediateFuture(new 
FragInstanceDispatchResult(failureStatusList.get(0)));
-      } else {
-        return immediateFuture(
-            new 
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+      List<TSStatus> failureStatusList = new ArrayList<>();
+      for (TSStatus dataNodeFailure : dispatchFailures) {
+        if (dataNodeFailure.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+          failureStatusList.addAll(dataNodeFailure.getSubStatus());
+        } else {
+          failureStatusList.add(dataNodeFailure);
+        }
       }
+      return immediateFuture(new 
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
     }
   }
 
-  private Future<FragInstanceDispatchResult> 
dispatchWriteAsync(List<FragmentInstance> instances) {
-    List<TSStatus> dataNodeFailureList = new ArrayList<>();
-    // split local and remote instances
-    List<FragmentInstance> localInstances = new ArrayList<>();
-    List<FragmentInstance> remoteInstances = new ArrayList<>();
+  /**
+   * Dispatch the given write instances once. It will dispatch the given 
instances locally or
+   * remotely, give the host datanode.
+   */
+  private List<FailedFragmentInstanceWithStatus> 
dispatchWriteOnce(List<FragmentInstance> instances)
+      throws InterruptedException {
+    if (instances.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final List<FragmentInstance> localInstances = new ArrayList<>();
+    final List<FragmentInstance> remoteInstances = new ArrayList<>();
     for (FragmentInstance instance : instances) {
-      if (instance.getHostDataNode() == null) {
-        dataNodeFailureList.add(
-            new 
TSStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()));
-        continue;
-      }
-      TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
-      if (isDispatchedToLocal(endPoint)) {
+      if 
(isDispatchedToLocal(instance.getHostDataNode().getInternalEndPoint())) {
         localInstances.add(instance);
       } else {
         remoteInstances.add(instance);
       }
     }
-    // async dispatch to remote
-    AsyncPlanNodeSender asyncPlanNodeSender =
+
+    final List<FailedFragmentInstanceWithStatus> 
failedFragmentInstanceWithStatuses =
+        new ArrayList<>();
+
+    // 1. async dispatch to remote
+    final AsyncPlanNodeSender asyncPlanNodeSender =
         new AsyncPlanNodeSender(asyncInternalServiceClientManager, 
remoteInstances);
     asyncPlanNodeSender.sendAll();
 
+    // 2. sync dispatch to local
     if (!localInstances.isEmpty()) {
-      // sync dispatch to local
       long localScheduleStartTime = System.nanoTime();
       for (FragmentInstance localInstance : localInstances) {
-        try (SetThreadName threadName = new 
SetThreadName(localInstance.getId().getFullId())) {
+        try (SetThreadName ignored = new 
SetThreadName(localInstance.getId().getFullId())) {
           dispatchLocally(localInstance);
         } catch (FragmentInstanceDispatchException e) {
-          dataNodeFailureList.add(e.getFailureStatus());
+          failedFragmentInstanceWithStatuses.add(
+              new FailedFragmentInstanceWithStatus(localInstance, 
e.getFailureStatus()));
         } catch (Throwable t) {
           LOGGER.warn(DISPATCH_FAILED, t);
-          dataNodeFailureList.add(
-              RpcUtils.getStatus(
-                  TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage()));
+          failedFragmentInstanceWithStatuses.add(
+              new FailedFragmentInstanceWithStatus(
+                  localInstance,
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage())));
         }
       }
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost(
           System.nanoTime() - localScheduleStartTime);
     }
-    // wait until remote dispatch done
-    try {
-      asyncPlanNodeSender.waitUntilCompleted();
-      final long maxRetryDurationInNs =
-          COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
-              ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
-              : 0;
-      if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
-        // retry failed remote FIs
-        int retryCount = 0;
-        long waitMillis = getRetrySleepTime(retryCount);
-        long retryStartTime = System.nanoTime();
-
-        while (asyncPlanNodeSender.needRetry()) {
-          retryCount++;
-          asyncPlanNodeSender.retry();
-          // if !(still need retry and current time + next sleep time < 
maxRetryDurationInNs)
-          if (!(asyncPlanNodeSender.needRetry()
-              && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
-                  < maxRetryDurationInNs)) {
-            break;
-          }
-          // still need to retry, sleep some time before make another retry.
-          Thread.sleep(waitMillis);
-          PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 
1_000_000L);
-          waitMillis = getRetrySleepTime(retryCount);
-        }
-      }
 
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOGGER.error("Interrupted when dispatching write async", e);
-      return immediateFuture(
-          new FragInstanceDispatchResult(
-              RpcUtils.getStatus(
-                  TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + 
e.getMessage())));
-    }
+    // 3. wait for remote dispatch results
+    asyncPlanNodeSender.waitUntilCompleted();
 
-    dataNodeFailureList.addAll(asyncPlanNodeSender.getFailureStatusList());
+    // 4. collect remote dispatch results
+    
failedFragmentInstanceWithStatuses.addAll(asyncPlanNodeSender.getFailedInstancesWithStatuses());
 
-    if (dataNodeFailureList.isEmpty()) {
-      return immediateFuture(new FragInstanceDispatchResult(true));
-    }
-    if (instances.size() == 1) {
-      return immediateFuture(new 
FragInstanceDispatchResult(dataNodeFailureList.get(0)));
-    } else {
-      List<TSStatus> failureStatusList = new ArrayList<>();
-      for (TSStatus dataNodeFailure : dataNodeFailureList) {
-        if (dataNodeFailure.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-          failureStatusList.addAll(dataNodeFailure.getSubStatus());
-        } else {
-          failureStatusList.add(dataNodeFailure);
-        }
+    return failedFragmentInstanceWithStatuses;
+  }
+
+  private List<FailedFragmentInstanceWithStatus> dispatchRetryWrite(
+      List<FragmentInstance> retriedInstances, int maxRetryAttempts) throws 
InterruptedException {
+    Preconditions.checkArgument(maxRetryAttempts > 0);
+
+    final long retryStartTime = System.nanoTime();
+    int retryAttempt = 0;
+    List<FragmentInstance> nextDispatch = new ArrayList<>(retriedInstances);
+    List<FailedFragmentInstanceWithStatus> failedFragmentInstanceWithStatuses =
+        Collections.emptyList();
+
+    while (retryAttempt < maxRetryAttempts) {
+      // 1. let's retry on next replica location
+      nextDispatch.forEach(FragmentInstance::getNextRetriedHostDataNode);
+
+      // 2. dispatch the instances
+      failedFragmentInstanceWithStatuses = dispatchWriteOnce(nextDispatch);
+
+      // 3. decide if to continue the retry
+      final long waitMillis = getRetrySleepTime(retryAttempt);
+      if (failedFragmentInstanceWithStatuses.isEmpty()
+          || waitMillis + System.nanoTime() >= retryStartTime + 
maxRetryDurationInNs) {
+        break;
       }
-      return immediateFuture(new 
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+
+      // 4. sleep and do the next retry
+      Thread.sleep(waitMillis);
+      PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 
1_000_000L);
+      retryAttempt++;
+      nextDispatch =
+          failedFragmentInstanceWithStatuses.stream()
+              .map(FailedFragmentInstanceWithStatus::getInstance)
+              .collect(Collectors.toList());
     }
+
+    return failedFragmentInstanceWithStatuses;
   }
 
   private long getRetrySleepTime(int retryTimes) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 69e48494f7a..00df318bdf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.exception.QuerySchemaFetchFailedException;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
 import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
@@ -156,7 +157,8 @@ public class ErrorHandlingUtils {
       return RpcUtils.getStatus(
           TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + 
rootCause.getMessage());
     } else if (t instanceof RootFIPlacementException
-        || t instanceof ReplicaSetUnreachableException) {
+        || t instanceof ReplicaSetUnreachableException
+        || t instanceof QuerySchemaFetchFailedException) {
       return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, 
rootCause.getMessage());
     } else if (t instanceof IoTDBException) {
       return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), 
rootCause.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QuerySchemaFetchFailedException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QuerySchemaFetchFailedException.java
new file mode 100644
index 00000000000..096042a2bf2
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QuerySchemaFetchFailedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception;
+
+/**
+ * Failed to fetch schema via Coordinator during query execution. This is 
likely caused by network
+ * partition.
+ */
+public class QuerySchemaFetchFailedException extends IoTDBRuntimeException {
+  public QuerySchemaFetchFailedException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index 4ffa36604d6..70d3d3acb2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -58,7 +58,6 @@ public class StatusUtils {
     NEED_RETRY.add(TSStatusCode.WAL_ERROR.getStatusCode());
     NEED_RETRY.add(TSStatusCode.DISK_SPACE_INSUFFICIENT.getStatusCode());
     NEED_RETRY.add(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
-    NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_TIME_OUT.getStatusCode());
     NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode());
     NEED_RETRY.add(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
     NEED_RETRY.add(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());

Reply via email to