This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch retry_write_local_FI in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6ea3e2d0813875ebf0c523e4e26ea4f57d4597d4 Author: HTHou <[email protected]> AuthorDate: Mon Feb 9 16:32:20 2026 +0800 [To dev/1.3] Retry dispatch failed local write FI remotely --- .../scheduler/FragmentInstanceDispatcherImpl.java | 85 ++++++++++++++-------- 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 162ccf22a75..d92e60c4c44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -306,52 +306,45 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { asyncPlanNodeSender.sendAll(); if (!localInstances.isEmpty()) { + List<FragmentInstance> failedLocalInstances = new ArrayList<>(); // sync dispatch to local long localScheduleStartTime = System.nanoTime(); for (FragmentInstance localInstance : localInstances) { try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) { dispatchLocally(localInstance); } catch (FragmentInstanceDispatchException e) { - dataNodeFailureList.add(e.getFailureStatus()); + failedLocalInstances.add(localInstance); } catch (Throwable t) { LOGGER.warn(DISPATCH_FAILED, t); - dataNodeFailureList.add( - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + failedLocalInstances.add(localInstance); } } PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost( System.nanoTime() - localScheduleStartTime); + + // retry dispatch failed instance remotely + if (!failedLocalInstances.isEmpty()) { + AsyncPlanNodeSender failedPlanNodeSender = + new AsyncPlanNodeSender(asyncInternalServiceClientManager, failedLocalInstances); + failedPlanNodeSender.sendAll(); + try { + getRemoteDispatchResultAndRetry(failedPlanNodeSender); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Interrupted when dispatching write async", e); + return immediateFuture( + new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, + "Interrupted errors: " + e.getMessage()))); + } + + dataNodeFailureList.addAll(failedPlanNodeSender.getFailureStatusList()); + } } // wait until remote dispatch done try { - asyncPlanNodeSender.waitUntilCompleted(); - final long maxRetryDurationInNs = - COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0 - ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L - : 0; - if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) { - // retry failed remote FIs - int retryCount = 0; - long waitMillis = getRetrySleepTime(retryCount); - long retryStartTime = System.nanoTime(); - - while (asyncPlanNodeSender.needRetry()) { - retryCount++; - asyncPlanNodeSender.retry(); - // if !(still need retry and current time + next sleep time < maxRetryDurationInNs) - if (!(asyncPlanNodeSender.needRetry() - && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L) - < maxRetryDurationInNs)) { - break; - } - // still need to retry, sleep some time before make another retry. - Thread.sleep(waitMillis); - PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 1_000_000L); - waitMillis = getRetrySleepTime(retryCount); - } - } - + getRemoteDispatchResultAndRetry(asyncPlanNodeSender); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.error("Interrupted when dispatching write async", e); @@ -381,6 +374,36 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } } + private void getRemoteDispatchResultAndRetry(AsyncPlanNodeSender asyncPlanNodeSender) + throws InterruptedException { + asyncPlanNodeSender.waitUntilCompleted(); + final long maxRetryDurationInNs = + COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0 + ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L + : 0; + if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) { + // retry failed remote FIs + int retryCount = 0; + long waitMillis = getRetrySleepTime(retryCount); + long retryStartTime = System.nanoTime(); + + while (asyncPlanNodeSender.needRetry()) { + retryCount++; + asyncPlanNodeSender.retry(); + // if !(still need retry and current time + next sleep time < maxRetryDurationInNs) + if (!(asyncPlanNodeSender.needRetry() + && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L) + < maxRetryDurationInNs)) { + break; + } + // still need to retry, sleep some time before make another retry. + Thread.sleep(waitMillis); + PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 1_000_000L); + waitMillis = getRetrySleepTime(retryCount); + } + } + } + private long getRetrySleepTime(int retryTimes) { return Math.min( (long) (TimeUnit.MILLISECONDS.toMillis(100) * Math.pow(2, retryTimes)),
