This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 21421eca4b4 [To dev/1.3] Retry dispatch failed local write FI remotely
(#17187)
21421eca4b4 is described below
commit 21421eca4b4268bc3e75c4091e8eb3b738b15864
Author: Haonan <[email protected]>
AuthorDate: Tue Feb 10 09:34:03 2026 +0800
[To dev/1.3] Retry dispatch failed local write FI remotely (#17187)
* [To dev/1.3] Retry dispatch failed local write FI remotely
* fix try logic
---
.../scheduler/FragmentInstanceDispatcherImpl.java | 97 +++++++++++++++-------
1 file changed, 67 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 162ccf22a75..7bcdd1fb694 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -306,52 +307,58 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
asyncPlanNodeSender.sendAll();
if (!localInstances.isEmpty()) {
+ List<FragmentInstance> instancesNeedRemoteRetry = new ArrayList<>();
// sync dispatch to local
long localScheduleStartTime = System.nanoTime();
for (FragmentInstance localInstance : localInstances) {
try (SetThreadName threadName = new
SetThreadName(localInstance.getId().getFullId())) {
dispatchLocally(localInstance);
} catch (FragmentInstanceDispatchException e) {
- dataNodeFailureList.add(e.getFailureStatus());
+ if (localInstance.getRegionReplicaSet().dataNodeLocations.size() > 1
+ && StatusUtils.needRetryHelper(e.getFailureStatus())) {
+ instancesNeedRemoteRetry.add(localInstance);
+ } else {
+ dataNodeFailureList.add(e.getFailureStatus());
+ }
} catch (Throwable t) {
LOGGER.warn(DISPATCH_FAILED, t);
- dataNodeFailureList.add(
+ TSStatus status =
RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage()));
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage());
+ if (localInstance.getRegionReplicaSet().dataNodeLocations.size() > 1
+ && StatusUtils.needRetryHelper(status)) {
+ instancesNeedRemoteRetry.add(localInstance);
+ } else {
+ dataNodeFailureList.add(status);
+ }
}
}
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost(
System.nanoTime() - localScheduleStartTime);
+
+ // retry dispatch failed instance remotely
+ if (!instancesNeedRemoteRetry.isEmpty()) {
+ AsyncPlanNodeSender remoteRetrySender =
+ new AsyncPlanNodeSender(asyncInternalServiceClientManager,
instancesNeedRemoteRetry);
+ remoteRetrySender.sendAll();
+ try {
+ getRemoteDispatchResultAndRetry(remoteRetrySender);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted when dispatching write async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR,
+ "Interrupted errors: " + e.getMessage())));
+ }
+
+ dataNodeFailureList.addAll(remoteRetrySender.getFailureStatusList());
+ }
}
// wait until remote dispatch done
try {
- asyncPlanNodeSender.waitUntilCompleted();
- final long maxRetryDurationInNs =
- COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
- ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
- : 0;
- if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
- // retry failed remote FIs
- int retryCount = 0;
- long waitMillis = getRetrySleepTime(retryCount);
- long retryStartTime = System.nanoTime();
-
- while (asyncPlanNodeSender.needRetry()) {
- retryCount++;
- asyncPlanNodeSender.retry();
- // if !(still need retry and current time + next sleep time <
maxRetryDurationInNs)
- if (!(asyncPlanNodeSender.needRetry()
- && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
- < maxRetryDurationInNs)) {
- break;
- }
- // still need to retry, sleep some time before make another retry.
- Thread.sleep(waitMillis);
- PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis *
1_000_000L);
- waitMillis = getRetrySleepTime(retryCount);
- }
- }
-
+ getRemoteDispatchResultAndRetry(asyncPlanNodeSender);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("Interrupted when dispatching write async", e);
@@ -381,6 +388,36 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
}
+ private void getRemoteDispatchResultAndRetry(AsyncPlanNodeSender
asyncPlanNodeSender)
+ throws InterruptedException {
+ asyncPlanNodeSender.waitUntilCompleted();
+ final long maxRetryDurationInNs =
+ COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
+ ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
+ : 0;
+ if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
+ // retry failed remote FIs
+ int retryCount = 0;
+ long waitMillis = getRetrySleepTime(retryCount);
+ long retryStartTime = System.nanoTime();
+
+ while (asyncPlanNodeSender.needRetry()) {
+ retryCount++;
+ asyncPlanNodeSender.retry();
+ // if !(still need retry and current time + next sleep time <
maxRetryDurationInNs)
+ if (!(asyncPlanNodeSender.needRetry()
+ && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
+ < maxRetryDurationInNs)) {
+ break;
+ }
+ // still need to retry, sleep some time before make another retry.
+ Thread.sleep(waitMillis);
+ PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis *
1_000_000L);
+ waitMillis = getRetrySleepTime(retryCount);
+ }
+ }
+ }
+
private long getRetrySleepTime(int retryTimes) {
return Math.min(
(long) (TimeUnit.MILLISECONDS.toMillis(100) * Math.pow(2, retryTimes)),