This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 21421eca4b4 [To dev/1.3] Retry dispatch failed local write FI remotely 
(#17187)
21421eca4b4 is described below

commit 21421eca4b4268bc3e75c4091e8eb3b738b15864
Author: Haonan <[email protected]>
AuthorDate: Tue Feb 10 09:34:03 2026 +0800

    [To dev/1.3] Retry dispatch failed local write FI remotely (#17187)
    
    * [To dev/1.3] Retry dispatch failed local write FI remotely
    
    * fix try logic
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 97 +++++++++++++++-------
 1 file changed, 67 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 162ccf22a75..7bcdd1fb694 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -306,52 +307,58 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     asyncPlanNodeSender.sendAll();
 
     if (!localInstances.isEmpty()) {
+      List<FragmentInstance> instancesNeedRemoteRetry = new ArrayList<>();
       // sync dispatch to local
       long localScheduleStartTime = System.nanoTime();
       for (FragmentInstance localInstance : localInstances) {
         try (SetThreadName threadName = new 
SetThreadName(localInstance.getId().getFullId())) {
           dispatchLocally(localInstance);
         } catch (FragmentInstanceDispatchException e) {
-          dataNodeFailureList.add(e.getFailureStatus());
+          if (localInstance.getRegionReplicaSet().dataNodeLocations.size() > 1
+              && StatusUtils.needRetryHelper(e.getFailureStatus())) {
+            instancesNeedRemoteRetry.add(localInstance);
+          } else {
+            dataNodeFailureList.add(e.getFailureStatus());
+          }
         } catch (Throwable t) {
           LOGGER.warn(DISPATCH_FAILED, t);
-          dataNodeFailureList.add(
+          TSStatus status =
               RpcUtils.getStatus(
-                  TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage()));
+                  TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage());
+          if (localInstance.getRegionReplicaSet().dataNodeLocations.size() > 1
+              && StatusUtils.needRetryHelper(status)) {
+            instancesNeedRemoteRetry.add(localInstance);
+          } else {
+            dataNodeFailureList.add(status);
+          }
         }
       }
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost(
           System.nanoTime() - localScheduleStartTime);
+
+      // retry dispatch failed instance remotely
+      if (!instancesNeedRemoteRetry.isEmpty()) {
+        AsyncPlanNodeSender remoteRetrySender =
+            new AsyncPlanNodeSender(asyncInternalServiceClientManager, 
instancesNeedRemoteRetry);
+        remoteRetrySender.sendAll();
+        try {
+          getRemoteDispatchResultAndRetry(remoteRetrySender);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          LOGGER.error("Interrupted when dispatching write async", e);
+          return immediateFuture(
+              new FragInstanceDispatchResult(
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR,
+                      "Interrupted errors: " + e.getMessage())));
+        }
+
+        dataNodeFailureList.addAll(remoteRetrySender.getFailureStatusList());
+      }
     }
     // wait until remote dispatch done
     try {
-      asyncPlanNodeSender.waitUntilCompleted();
-      final long maxRetryDurationInNs =
-          COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
-              ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
-              : 0;
-      if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
-        // retry failed remote FIs
-        int retryCount = 0;
-        long waitMillis = getRetrySleepTime(retryCount);
-        long retryStartTime = System.nanoTime();
-
-        while (asyncPlanNodeSender.needRetry()) {
-          retryCount++;
-          asyncPlanNodeSender.retry();
-          // if !(still need retry and current time + next sleep time < 
maxRetryDurationInNs)
-          if (!(asyncPlanNodeSender.needRetry()
-              && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
-                  < maxRetryDurationInNs)) {
-            break;
-          }
-          // still need to retry, sleep some time before make another retry.
-          Thread.sleep(waitMillis);
-          PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 
1_000_000L);
-          waitMillis = getRetrySleepTime(retryCount);
-        }
-      }
-
+      getRemoteDispatchResultAndRetry(asyncPlanNodeSender);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error("Interrupted when dispatching write async", e);
@@ -381,6 +388,36 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
+  private void getRemoteDispatchResultAndRetry(AsyncPlanNodeSender 
asyncPlanNodeSender)
+      throws InterruptedException {
+    asyncPlanNodeSender.waitUntilCompleted();
+    final long maxRetryDurationInNs =
+        COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
+            ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
+            : 0;
+    if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
+      // retry failed remote FIs
+      int retryCount = 0;
+      long waitMillis = getRetrySleepTime(retryCount);
+      long retryStartTime = System.nanoTime();
+
+      while (asyncPlanNodeSender.needRetry()) {
+        retryCount++;
+        asyncPlanNodeSender.retry();
+        // if !(still need retry and current time + next sleep time < 
maxRetryDurationInNs)
+        if (!(asyncPlanNodeSender.needRetry()
+            && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
+                < maxRetryDurationInNs)) {
+          break;
+        }
+        // still need to retry, sleep some time before make another retry.
+        Thread.sleep(waitMillis);
+        PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 
1_000_000L);
+        waitMillis = getRetrySleepTime(retryCount);
+      }
+    }
+  }
+
   private long getRetrySleepTime(int retryTimes) {
     return Math.min(
         (long) (TimeUnit.MILLISECONDS.toMillis(100) * Math.pow(2, retryTimes)),

Reply via email to