This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch read_retry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 730e56c9cfcae9ea10a66ffdecd898e45ef0f98c Author: OneSizeFitQuorum <[email protected]> AuthorDate: Tue Jan 23 11:34:06 2024 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../exception/RatisReadUnavailableException.java | 13 +++++++++---- .../org/apache/iotdb/consensus/ratis/RatisConsensus.java | 9 +++++++-- .../plan/scheduler/FragmentInstanceDispatcherImpl.java | 16 +++++++++++----- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java index d5ac76b93fe..f9505495d51 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java @@ -22,10 +22,15 @@ package org.apache.iotdb.consensus.exception; /** RaftServer is unable to serve linearizable read requests. */ public class RatisReadUnavailableException extends ConsensusException { + public static final String RATIS_READ_UNAVAILABLE = + "Raft Server cannot serve read requests now (leader is unknown or under recovery). " + + "Please try read later: "; + public RatisReadUnavailableException(Throwable cause) { - super( - "Raft Server cannot serve read requests now (leader is unknown or under recovery). " - + "Please try read later: ", - cause); + super(RATIS_READ_UNAVAILABLE, cause); + } + + public RatisReadUnavailableException(String cause) { + super(cause); } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 950af3658e1..c627692b72d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -157,7 +157,12 @@ class RatisConsensus implements IConsensus { this.ratisMetricSet = new RatisMetricSet(); this.readRetryPolicy = RetryPolicy.<RaftClientReply>newBuilder() - .setRetryHandler(c -> !c.isSuccess() && c.getException() instanceof ReadIndexException) + .setRetryHandler( + c -> + !c.isSuccess() + && (c.getException() instanceof ReadIndexException + || c.getException() instanceof ReadException + || c.getException() instanceof NotLeaderException)) .setMaxAttempts(this.config.getImpl().getRetryTimesMax()) .setWaitTime( TimeDuration.valueOf( @@ -347,7 +352,7 @@ class RatisConsensus implements IConsensus { if (canServeStaleRead != null && isLinearizableRead) { canServeStaleRead.get(groupId).set(true); } - } catch (ReadException | ReadIndexException e) { + } catch (ReadException | ReadIndexException | NotLeaderException e) { if (isLinearizableRead) { // linearizable read failed. the RaftServer is recovering from Raft Log and cannot serve // read requests. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 08498bcb141..b48e8201629 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.consensus.exception.RatisReadUnavailableException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -299,9 +300,14 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { client.sendFragmentInstance(sendFragmentInstanceReq); if (!sendFragmentInstanceResp.accepted) { logger.warn(sendFragmentInstanceResp.message); - throw new FragmentInstanceDispatchException( - RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); + if (sendFragmentInstanceResp.message.contains( + RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) { + throw new RatisReadUnavailableException(sendFragmentInstanceResp.message); + } else { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); + } } break; case WRITE: @@ -342,9 +348,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } - } catch (ClientManagerException | TException e) { + } catch (ClientManagerException | TException | RatisReadUnavailableException e) { logger.warn( - "can't connect to node {}, error msg is {}.", + "can't execute request on node {}, error msg is {}.", endPoint, ExceptionUtils.getRootCause(e).toString()); TSStatus status = new TSStatus();
