This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/AddRetryConfig in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56bc0398afd4ca37857b8b76b5e8106c5fd80296 Author: JackieTien97 <[email protected]> AuthorDate: Mon Jun 3 10:59:28 2024 +0800 Add retry configuration --- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 3 ++ .../plan/scheduler/AsyncPlanNodeSender.java | 6 ++-- .../scheduler/FragmentInstanceDispatcherImpl.java | 35 ++++++++++++---------- .../resources/conf/iotdb-system.properties | 20 +++++++++++++ .../apache/iotdb/commons/conf/CommonConfig.java | 20 +++++++++++++ .../iotdb/commons/conf/CommonDescriptor.java | 16 ++++++++++ .../service/metric/PerformanceOverviewMetrics.java | 23 ++++++++++++++ .../apache/iotdb/commons/utils/StatusUtils.java | 22 +++++++++++--- 8 files changed, 123 insertions(+), 22 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index bbcbc0ed794..fd1068b1257 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1824,6 +1824,9 @@ public class IoTDBDescriptor { // update Consensus config reloadConsensusProps(properties); + + // update retry config + commonDescriptor.loadRetryProperties(properties); } catch (Exception e) { throw new QueryProcessException(String.format("Fail to reload configuration because %s", e)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java index c004356a5f4..230eb27941e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java @@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicLong; public class AsyncPlanNodeSender { - private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncPlanNodeSender.class); private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncInternalServiceClientManager; private final List<FragmentInstance> instances; @@ -116,14 +116,14 @@ public class AsyncPlanNodeSender { status = entry.getValue().getStatus(); if (!entry.getValue().accepted) { if (status == null) { - logger.warn( + LOGGER.warn( "dispatch write failed. message: {}, node {}", entry.getValue().message, instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); failureStatusList.add( RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage())); } else { - logger.warn( + LOGGER.warn( "dispatch write failed. status: {}, code: {}, message: {}, node {}", entry.getValue().status, TSStatusCode.representOf(status.code), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index d401a6196f9..8564aab0a3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.consensus.exception.RatisReadUnavailableException; @@ -66,8 +68,11 @@ import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DIS public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { - private static final Logger logger = + private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class); + + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + private final ExecutorService executor; private final ExecutorService writeOperationExecutor; private final QueryType type; @@ -126,7 +131,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } catch (FragmentInstanceDispatchException e) { return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); } catch (Throwable t) { - logger.warn(DISPATCH_FAILED, t); + LOGGER.warn(DISPATCH_FAILED, t); return immediateFuture( new FragInstanceDispatchResult( RpcUtils.getStatus( @@ -165,7 +170,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } } } catch (Throwable t) { - logger.warn(DISPATCH_FAILED, t); + LOGGER.warn(DISPATCH_FAILED, t); failureStatusList.add( RpcUtils.getStatus( TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); @@ -210,7 +215,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } catch (FragmentInstanceDispatchException e) { dataNodeFailureList.add(e.getFailureStatus()); } catch (Throwable t) { - logger.warn(DISPATCH_FAILED, t); + LOGGER.warn(DISPATCH_FAILED, t); dataNodeFailureList.add( RpcUtils.getStatus( TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); @@ -222,11 +227,10 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { // wait until remote dispatch done try { asyncPlanNodeSender.waitUntilCompleted(); - - if (asyncPlanNodeSender.needRetry()) { + final int maxRetryTimes = COMMON_CONFIG.getRemoteWriteMaxRetryCount(); + if (maxRetryTimes > 0 && asyncPlanNodeSender.needRetry()) { // retry failed remote FIs int retry = 0; - final int maxRetryTimes = 10; long waitMillis = getRetrySleepTime(retry); while (asyncPlanNodeSender.needRetry()) { @@ -237,13 +241,14 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } // still need to retry, sleep some time before make another retry. Thread.sleep(waitMillis); + PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 1_000_000L); waitMillis = getRetrySleepTime(retry); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.error("Interrupted when dispatching write async", e); + LOGGER.error("Interrupted when dispatching write async", e); return immediateFuture( new FragInstanceDispatchResult( RpcUtils.getStatus( @@ -308,7 +313,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSendFragmentInstanceResp sendFragmentInstanceResp = client.sendFragmentInstance(sendFragmentInstanceReq); if (!sendFragmentInstanceResp.accepted) { - logger.warn(sendFragmentInstanceResp.message); + LOGGER.warn(sendFragmentInstanceResp.message); if (sendFragmentInstanceResp.isSetNeedRetry() && sendFragmentInstanceResp.isNeedRetry()) { throw new RatisReadUnavailableException(sendFragmentInstanceResp.message); @@ -330,7 +335,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSendSinglePlanNodeResp sendPlanNodeResp = client.sendBatchPlanNode(sendPlanNodeReq).getResponses().get(0); if (!sendPlanNodeResp.accepted) { - logger.warn( + LOGGER.warn( "dispatch write failed. status: {}, code: {}, message: {}, node {}", sendPlanNodeResp.status, TSStatusCode.representOf(sendPlanNodeResp.status.code), @@ -366,7 +371,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { try { dispatchRemoteHelper(instance, endPoint); } catch (ClientManagerException | TException | RatisReadUnavailableException e) { - logger.warn( + LOGGER.warn( "can't execute request on node {}, error msg is {}, and we try to reconnect this node.", endPoint, ExceptionUtils.getRootCause(e).toString()); @@ -374,7 +379,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { try { dispatchRemoteHelper(instance, endPoint); } catch (ClientManagerException | TException | RatisReadUnavailableException e1) { - logger.warn( + LOGGER.warn( "can't execute request on node {} in second try, error msg is {}.", endPoint, ExceptionUtils.getRootCause(e1).toString()); @@ -398,7 +403,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { ConsensusGroupId.Factory.createFromTConsensusGroupId( instance.getRegionReplicaSet().getRegionId()); } catch (Throwable t) { - logger.warn("Deserialize ConsensusGroupId failed. ", t); + LOGGER.warn("Deserialize ConsensusGroupId failed. ", t); throw new FragmentInstanceDispatchException( RpcUtils.getStatus( TSStatusCode.EXECUTE_STATEMENT_ERROR, @@ -414,7 +419,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { ? readExecutor.execute(instance) : readExecutor.execute(groupId, instance); if (!readResult.isAccepted()) { - logger.warn(readResult.getMessage()); + LOGGER.warn(readResult.getMessage()); throw new FragmentInstanceDispatchException( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, readResult.getMessage())); } @@ -426,7 +431,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { if (!writeResult.isAccepted()) { // DO NOT LOG READ_ONLY ERROR if (writeResult.getStatus().getCode() != TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) { - logger.warn( + LOGGER.warn( "write locally failed. TSStatus: {}, message: {}", writeResult.getStatus(), writeResult.getMessage()); diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties index b92142ebe63..c6b4700307a 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties @@ -1849,3 +1849,23 @@ data_replication_factor=1 # Default value is -1, which means no limit. # Datatype: int # load_write_throughput_bytes_per_second=-1 + + + +#################### +### Retry Configuration +#################### + +# The maximum times for retrying write request remotely dispatching. +# It only takes effect for write request remotely dispatching, not including locally dispatching and query +# Set to 0 or negative number to disable remote dispatching write request retrying +# We will sleep for some time between each retry, 100ms, 200ms, 400ms, 800ms and so on, util reaching 20,000ms, we won't increase the sleeping time any more +# effectiveMode: hot_reload +# Datatype: int +# write_request_remote_dispatch_max_retry_count=10 + +# Whether retrying for unknown errors. +# Current unknown errors includes EXECUTE_STATEMENT_ERROR(301) and INTERNAL_SERVER_ERROR(305) +# effectiveMode: hot_reload +# Datatype: boolean +# enable_retry_for_unknown_error=true diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index b78f7dcb6fb..0b097998692 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -280,6 +280,10 @@ public class CommonConfig { private final Set<String> enabledKillPoints = KillPoint.parseKillPoints(System.getProperty(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS)); + private volatile boolean retryForUnknownErrors = true; + + private volatile int remoteWriteMaxRetryCount = 10; + CommonConfig() { // Empty constructor } @@ -1210,4 +1214,20 @@ public class CommonConfig { public Set<String> getEnabledKillPoints() { return enabledKillPoints; } + + public boolean isRetryForUnknownErrors() { + return retryForUnknownErrors; + } + + public void setRetryForUnknownErrors(boolean retryForUnknownErrors) { + this.retryForUnknownErrors = retryForUnknownErrors; + } + + public int getRemoteWriteMaxRetryCount() { + return remoteWriteMaxRetryCount; + } + + public void setRemoteWriteMaxRetryCount(int remoteWriteMaxRetryCount) { + this.remoteWriteMaxRetryCount = remoteWriteMaxRetryCount; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 5979a6f56e5..af6b666823c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -246,6 +246,8 @@ public class CommonDescriptor { properties.getProperty( "cluster_device_limit_threshold", String.valueOf(config.getDeviceLimitThreshold())))); + + loadRetryProperties(properties); } private void loadPipeProps(Properties properties) { @@ -614,6 +616,20 @@ public class CommonDescriptor { String.valueOf(config.getSubscriptionReadFileBufferSize())))); } + public void loadRetryProperties(Properties properties) { + config.setRemoteWriteMaxRetryCount( + Integer.parseInt( + properties.getProperty( + "write_request_remote_dispatch_max_retry_count", + String.valueOf(config.getRemoteWriteMaxRetryCount())))); + + config.setRetryForUnknownErrors( + Boolean.parseBoolean( + properties.getProperty( + "enable_retry_for_unknown_error", + String.valueOf(config.isRetryForUnknownErrors())))); + } + public void loadGlobalConfig(TGlobalConfig globalConfig) { config.setTimestampPrecision(globalConfig.timestampPrecision); config.setTimePartitionInterval( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java index 1efbe626335..263343c3406 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java @@ -112,6 +112,8 @@ public class PerformanceOverviewMetrics implements IMetricSet { private static final String LOCAL_SCHEDULE = "local_scheduler"; private static final String REMOTE_SCHEDULE = "remote_scheduler"; + private static final String REMOTE_RETRY_SLEEP = "remote_retry"; + static { metricInfoMap.put( LOCAL_SCHEDULE, @@ -127,11 +129,20 @@ public class PerformanceOverviewMetrics implements IMetricSet { PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL, Tag.STAGE.toString(), REMOTE_SCHEDULE)); + metricInfoMap.put( + REMOTE_RETRY_SLEEP, + new MetricInfo( + MetricType.TIMER, + PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL, + Tag.STAGE.toString(), + REMOTE_RETRY_SLEEP)); } private Timer localScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer remoteScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer remoteRetrySleepTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + /** Record the time cost of local schedule. */ public void recordScheduleLocalCost(long costTimeInNanos) { localScheduleTimer.updateNanos(costTimeInNanos); @@ -142,6 +153,11 @@ public class PerformanceOverviewMetrics implements IMetricSet { remoteScheduleTimer.updateNanos(costTimeInNanos); } + /** Record the time cost of remote schedule. */ + public void recordRemoteRetrySleepCost(long costTimeInNanos) { + remoteRetrySleepTimer.updateNanos(costTimeInNanos); + } + // endregion // region local schedule @@ -327,6 +343,13 @@ public class PerformanceOverviewMetrics implements IMetricSet { MetricLevel.CORE, Tag.STAGE.toString(), REMOTE_SCHEDULE); + remoteRetrySleepTimer = + metricService.getOrCreateTimer( + PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL, + MetricLevel.CORE, + Tag.STAGE.toString(), + REMOTE_RETRY_SLEEP); + // bind local schedule metrics schemaValidateTimer = metricService.getOrCreateTimer( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index be32bf5a459..97811bc76a9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -40,11 +40,16 @@ public class StatusUtils { private static final Set<Integer> NEED_RETRY = new HashSet<>(); + private static final Set<Integer> UNKNOWN_ERRORS = new HashSet<>(); + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); static { - NEED_RETRY.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - NEED_RETRY.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + // UNKNOWN ERRORS + UNKNOWN_ERRORS.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + UNKNOWN_ERRORS.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + + // KNOWN ERRORS NEED_RETRY.add(TSStatusCode.DISPATCH_ERROR.getStatusCode()); NEED_RETRY.add(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); NEED_RETRY.add(TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode()); @@ -213,14 +218,23 @@ public class StatusUtils { int code = status.getCode(); if (code == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { for (TSStatus subStatus : status.subStatus) { + // any sub codes for MULTIPLE_ERROR don't need to retry, we won't retry for the whole + // request if (subStatus == null - || (subStatus.getCode() != OK.code && !NEED_RETRY.contains(subStatus.getCode()))) { + || (subStatus.getCode() != OK.code + && !needRetryHelperForSingleStatus(subStatus.getCode()))) { return false; } } return true; } else { - return NEED_RETRY.contains(code); + return needRetryHelperForSingleStatus(code); } } + + // without MULTIPLE_ERROR(302) + private static boolean needRetryHelperForSingleStatus(int statusCode) { + return NEED_RETRY.contains(statusCode) + || (COMMON_CONFIG.isRetryForUnknownErrors() && UNKNOWN_ERRORS.contains(statusCode)); + } }
