This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/AddRetryConfig
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56bc0398afd4ca37857b8b76b5e8106c5fd80296
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jun 3 10:59:28 2024 +0800

    Add retry configuration
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  3 ++
 .../plan/scheduler/AsyncPlanNodeSender.java        |  6 ++--
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 35 ++++++++++++----------
 .../resources/conf/iotdb-system.properties         | 20 +++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 20 +++++++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       | 16 ++++++++++
 .../service/metric/PerformanceOverviewMetrics.java | 23 ++++++++++++++
 .../apache/iotdb/commons/utils/StatusUtils.java    | 22 +++++++++++---
 8 files changed, 123 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bbcbc0ed794..fd1068b1257 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1824,6 +1824,9 @@ public class IoTDBDescriptor {
 
       // update Consensus config
       reloadConsensusProps(properties);
+
+      // update retry config
+      commonDescriptor.loadRetryProperties(properties);
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload 
configuration because %s", e));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index c004356a5f4..230eb27941e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class AsyncPlanNodeSender {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(AsyncPlanNodeSender.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AsyncPlanNodeSender.class);
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
       asyncInternalServiceClientManager;
   private final List<FragmentInstance> instances;
@@ -116,14 +116,14 @@ public class AsyncPlanNodeSender {
       status = entry.getValue().getStatus();
       if (!entry.getValue().accepted) {
         if (status == null) {
-          logger.warn(
+          LOGGER.warn(
               "dispatch write failed. message: {}, node {}",
               entry.getValue().message,
               
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
           failureStatusList.add(
               RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, 
entry.getValue().getMessage()));
         } else {
-          logger.warn(
+          LOGGER.warn(
               "dispatch write failed. status: {}, code: {}, message: {}, node 
{}",
               entry.getValue().status,
               TSStatusCode.representOf(status.code),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index d401a6196f9..8564aab0a3e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
@@ -66,8 +68,11 @@ import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DIS
 
 public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher 
{
 
-  private static final Logger logger =
+  private static final Logger LOGGER =
       LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
+
+  private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
+
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
   private final QueryType type;
@@ -126,7 +131,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       } catch (FragmentInstanceDispatchException e) {
         return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
       } catch (Throwable t) {
-        logger.warn(DISPATCH_FAILED, t);
+        LOGGER.warn(DISPATCH_FAILED, t);
         return immediateFuture(
             new FragInstanceDispatchResult(
                 RpcUtils.getStatus(
@@ -165,7 +170,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           }
         }
       } catch (Throwable t) {
-        logger.warn(DISPATCH_FAILED, t);
+        LOGGER.warn(DISPATCH_FAILED, t);
         failureStatusList.add(
             RpcUtils.getStatus(
                 TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage()));
@@ -210,7 +215,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         } catch (FragmentInstanceDispatchException e) {
           dataNodeFailureList.add(e.getFailureStatus());
         } catch (Throwable t) {
-          logger.warn(DISPATCH_FAILED, t);
+          LOGGER.warn(DISPATCH_FAILED, t);
           dataNodeFailureList.add(
               RpcUtils.getStatus(
                   TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage()));
@@ -222,11 +227,10 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     // wait until remote dispatch done
     try {
       asyncPlanNodeSender.waitUntilCompleted();
-
-      if (asyncPlanNodeSender.needRetry()) {
+      final int maxRetryTimes = COMMON_CONFIG.getRemoteWriteMaxRetryCount();
+      if (maxRetryTimes > 0 && asyncPlanNodeSender.needRetry()) {
         // retry failed remote FIs
         int retry = 0;
-        final int maxRetryTimes = 10;
         long waitMillis = getRetrySleepTime(retry);
 
         while (asyncPlanNodeSender.needRetry()) {
@@ -237,13 +241,14 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           }
           // still need to retry, sleep some time before make another retry.
           Thread.sleep(waitMillis);
+          PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 
1_000_000L);
           waitMillis = getRetrySleepTime(retry);
         }
       }
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      logger.error("Interrupted when dispatching write async", e);
+      LOGGER.error("Interrupted when dispatching write async", e);
       return immediateFuture(
           new FragInstanceDispatchResult(
               RpcUtils.getStatus(
@@ -308,7 +313,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           TSendFragmentInstanceResp sendFragmentInstanceResp =
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
-            logger.warn(sendFragmentInstanceResp.message);
+            LOGGER.warn(sendFragmentInstanceResp.message);
             if (sendFragmentInstanceResp.isSetNeedRetry()
                 && sendFragmentInstanceResp.isNeedRetry()) {
               throw new 
RatisReadUnavailableException(sendFragmentInstanceResp.message);
@@ -330,7 +335,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           TSendSinglePlanNodeResp sendPlanNodeResp =
               client.sendBatchPlanNode(sendPlanNodeReq).getResponses().get(0);
           if (!sendPlanNodeResp.accepted) {
-            logger.warn(
+            LOGGER.warn(
                 "dispatch write failed. status: {}, code: {}, message: {}, 
node {}",
                 sendPlanNodeResp.status,
                 TSStatusCode.representOf(sendPlanNodeResp.status.code),
@@ -366,7 +371,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     try {
       dispatchRemoteHelper(instance, endPoint);
     } catch (ClientManagerException | TException | 
RatisReadUnavailableException e) {
-      logger.warn(
+      LOGGER.warn(
           "can't execute request on node {}, error msg is {}, and we try to 
reconnect this node.",
           endPoint,
           ExceptionUtils.getRootCause(e).toString());
@@ -374,7 +379,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       try {
         dispatchRemoteHelper(instance, endPoint);
       } catch (ClientManagerException | TException | 
RatisReadUnavailableException e1) {
-        logger.warn(
+        LOGGER.warn(
             "can't execute request on node  {} in second try, error msg is 
{}.",
             endPoint,
             ExceptionUtils.getRootCause(e1).toString());
@@ -398,7 +403,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
             ConsensusGroupId.Factory.createFromTConsensusGroupId(
                 instance.getRegionReplicaSet().getRegionId());
       } catch (Throwable t) {
-        logger.warn("Deserialize ConsensusGroupId failed. ", t);
+        LOGGER.warn("Deserialize ConsensusGroupId failed. ", t);
         throw new FragmentInstanceDispatchException(
             RpcUtils.getStatus(
                 TSStatusCode.EXECUTE_STATEMENT_ERROR,
@@ -414,7 +419,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                 ? readExecutor.execute(instance)
                 : readExecutor.execute(groupId, instance);
         if (!readResult.isAccepted()) {
-          logger.warn(readResult.getMessage());
+          LOGGER.warn(readResult.getMessage());
           throw new FragmentInstanceDispatchException(
               RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
readResult.getMessage()));
         }
@@ -426,7 +431,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         if (!writeResult.isAccepted()) {
           // DO NOT LOG READ_ONLY ERROR
           if (writeResult.getStatus().getCode() != 
TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) {
-            logger.warn(
+            LOGGER.warn(
                 "write locally failed. TSStatus: {}, message: {}",
                 writeResult.getStatus(),
                 writeResult.getMessage());
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
index b92142ebe63..c6b4700307a 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
@@ -1849,3 +1849,23 @@ data_replication_factor=1
 # Default value is -1, which means no limit.
 # Datatype: int
 # load_write_throughput_bytes_per_second=-1
+
+
+
+####################
+### Retry Configuration
+####################
+
+# The maximum times for retrying write request remotely dispatching.
+# It only takes effect for write request remotely dispatching, not including 
locally dispatching and query
+# Set to 0 or negative number to disable remote dispatching write request 
retrying
+# We will sleep for some time between each retry, 100ms, 200ms, 400ms, 800ms 
and so on, util reaching 20,000ms, we won't increase the sleeping time any more
+# effectiveMode: hot_reload
+# Datatype: int
+# write_request_remote_dispatch_max_retry_count=10
+
+# Whether retrying for unknown errors.
+# Current unknown errors includes EXECUTE_STATEMENT_ERROR(301) and 
INTERNAL_SERVER_ERROR(305)
+# effectiveMode: hot_reload
+# Datatype: boolean
+# enable_retry_for_unknown_error=true
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b78f7dcb6fb..0b097998692 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -280,6 +280,10 @@ public class CommonConfig {
   private final Set<String> enabledKillPoints =
       
KillPoint.parseKillPoints(System.getProperty(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS));
 
+  private volatile boolean retryForUnknownErrors = true;
+
+  private volatile int remoteWriteMaxRetryCount = 10;
+
   CommonConfig() {
     // Empty constructor
   }
@@ -1210,4 +1214,20 @@ public class CommonConfig {
   public Set<String> getEnabledKillPoints() {
     return enabledKillPoints;
   }
+
+  public boolean isRetryForUnknownErrors() {
+    return retryForUnknownErrors;
+  }
+
+  public void setRetryForUnknownErrors(boolean retryForUnknownErrors) {
+    this.retryForUnknownErrors = retryForUnknownErrors;
+  }
+
+  public int getRemoteWriteMaxRetryCount() {
+    return remoteWriteMaxRetryCount;
+  }
+
+  public void setRemoteWriteMaxRetryCount(int remoteWriteMaxRetryCount) {
+    this.remoteWriteMaxRetryCount = remoteWriteMaxRetryCount;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5979a6f56e5..af6b666823c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -246,6 +246,8 @@ public class CommonDescriptor {
             properties.getProperty(
                 "cluster_device_limit_threshold",
                 String.valueOf(config.getDeviceLimitThreshold()))));
+
+    loadRetryProperties(properties);
   }
 
   private void loadPipeProps(Properties properties) {
@@ -614,6 +616,20 @@ public class CommonDescriptor {
                 String.valueOf(config.getSubscriptionReadFileBufferSize()))));
   }
 
+  public void loadRetryProperties(Properties properties) {
+    config.setRemoteWriteMaxRetryCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "write_request_remote_dispatch_max_retry_count",
+                String.valueOf(config.getRemoteWriteMaxRetryCount()))));
+
+    config.setRetryForUnknownErrors(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_retry_for_unknown_error",
+                String.valueOf(config.isRetryForUnknownErrors()))));
+  }
+
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
     config.setTimestampPrecision(globalConfig.timestampPrecision);
     config.setTimePartitionInterval(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
index 1efbe626335..263343c3406 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
@@ -112,6 +112,8 @@ public class PerformanceOverviewMetrics implements 
IMetricSet {
   private static final String LOCAL_SCHEDULE = "local_scheduler";
   private static final String REMOTE_SCHEDULE = "remote_scheduler";
 
+  private static final String REMOTE_RETRY_SLEEP = "remote_retry";
+
   static {
     metricInfoMap.put(
         LOCAL_SCHEDULE,
@@ -127,11 +129,20 @@ public class PerformanceOverviewMetrics implements 
IMetricSet {
             PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
             Tag.STAGE.toString(),
             REMOTE_SCHEDULE));
+    metricInfoMap.put(
+        REMOTE_RETRY_SLEEP,
+        new MetricInfo(
+            MetricType.TIMER,
+            PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
+            Tag.STAGE.toString(),
+            REMOTE_RETRY_SLEEP));
   }
 
   private Timer localScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer remoteScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
+  private Timer remoteRetrySleepTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
   /** Record the time cost of local schedule. */
   public void recordScheduleLocalCost(long costTimeInNanos) {
     localScheduleTimer.updateNanos(costTimeInNanos);
@@ -142,6 +153,11 @@ public class PerformanceOverviewMetrics implements 
IMetricSet {
     remoteScheduleTimer.updateNanos(costTimeInNanos);
   }
 
+  /** Record the time cost of remote schedule. */
+  public void recordRemoteRetrySleepCost(long costTimeInNanos) {
+    remoteRetrySleepTimer.updateNanos(costTimeInNanos);
+  }
+
   // endregion
 
   // region local schedule
@@ -327,6 +343,13 @@ public class PerformanceOverviewMetrics implements 
IMetricSet {
             MetricLevel.CORE,
             Tag.STAGE.toString(),
             REMOTE_SCHEDULE);
+    remoteRetrySleepTimer =
+        metricService.getOrCreateTimer(
+            PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
+            MetricLevel.CORE,
+            Tag.STAGE.toString(),
+            REMOTE_RETRY_SLEEP);
+
     // bind local schedule metrics
     schemaValidateTimer =
         metricService.getOrCreateTimer(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index be32bf5a459..97811bc76a9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -40,11 +40,16 @@ public class StatusUtils {
 
   private static final Set<Integer> NEED_RETRY = new HashSet<>();
 
+  private static final Set<Integer> UNKNOWN_ERRORS = new HashSet<>();
+
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
   static {
-    NEED_RETRY.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-    NEED_RETRY.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    // UNKNOWN ERRORS
+    UNKNOWN_ERRORS.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    UNKNOWN_ERRORS.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+
+    // KNOWN ERRORS
     NEED_RETRY.add(TSStatusCode.DISPATCH_ERROR.getStatusCode());
     NEED_RETRY.add(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
     NEED_RETRY.add(TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode());
@@ -213,14 +218,23 @@ public class StatusUtils {
     int code = status.getCode();
     if (code == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
       for (TSStatus subStatus : status.subStatus) {
+        // any sub codes for MULTIPLE_ERROR don't need to retry, we won't 
retry for the whole
+        // request
         if (subStatus == null
-            || (subStatus.getCode() != OK.code && 
!NEED_RETRY.contains(subStatus.getCode()))) {
+            || (subStatus.getCode() != OK.code
+                && !needRetryHelperForSingleStatus(subStatus.getCode()))) {
           return false;
         }
       }
       return true;
     } else {
-      return NEED_RETRY.contains(code);
+      return needRetryHelperForSingleStatus(code);
     }
   }
+
+  // without MULTIPLE_ERROR(302)
+  private static boolean needRetryHelperForSingleStatus(int statusCode) {
+    return NEED_RETRY.contains(statusCode)
+        || (COMMON_CONFIG.isRetryForUnknownErrors() && 
UNKNOWN_ERRORS.contains(statusCode));
+  }
 }

Reply via email to