This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch read_retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 730e56c9cfcae9ea10a66ffdecd898e45ef0f98c
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Tue Jan 23 11:34:06 2024 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../exception/RatisReadUnavailableException.java         | 13 +++++++++----
 .../org/apache/iotdb/consensus/ratis/RatisConsensus.java |  9 +++++++--
 .../plan/scheduler/FragmentInstanceDispatcherImpl.java   | 16 +++++++++++-----
 3 files changed, 27 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
index d5ac76b93fe..f9505495d51 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
@@ -22,10 +22,15 @@ package org.apache.iotdb.consensus.exception;
 /** RaftServer is unable to serve linearizable read requests. */
 public class RatisReadUnavailableException extends ConsensusException {
 
+  public static final String RATIS_READ_UNAVAILABLE =
+      "Raft Server cannot serve read requests now (leader is unknown or under 
recovery). "
+          + "Please try read later: ";
+
   public RatisReadUnavailableException(Throwable cause) {
-    super(
-        "Raft Server cannot serve read requests now (leader is unknown or 
under recovery). "
-            + "Please try read later: ",
-        cause);
+    super(RATIS_READ_UNAVAILABLE, cause);
+  }
+
+  public RatisReadUnavailableException(String cause) {
+    super(cause);
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 950af3658e1..c627692b72d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -157,7 +157,12 @@ class RatisConsensus implements IConsensus {
     this.ratisMetricSet = new RatisMetricSet();
     this.readRetryPolicy =
         RetryPolicy.<RaftClientReply>newBuilder()
-            .setRetryHandler(c -> !c.isSuccess() && c.getException() 
instanceof ReadIndexException)
+            .setRetryHandler(
+                c ->
+                    !c.isSuccess()
+                        && (c.getException() instanceof ReadIndexException
+                            || c.getException() instanceof ReadException
+                            || c.getException() instanceof NotLeaderException))
             .setMaxAttempts(this.config.getImpl().getRetryTimesMax())
             .setWaitTime(
                 TimeDuration.valueOf(
@@ -347,7 +352,7 @@ class RatisConsensus implements IConsensus {
       if (canServeStaleRead != null && isLinearizableRead) {
         canServeStaleRead.get(groupId).set(true);
       }
-    } catch (ReadException | ReadIndexException e) {
+    } catch (ReadException | ReadIndexException | NotLeaderException e) {
       if (isLinearizableRead) {
         // linearizable read failed. the RaftServer is recovering from Raft 
Log and cannot serve
         // read requests.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 08498bcb141..b48e8201629 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -299,9 +300,14 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
             logger.warn(sendFragmentInstanceResp.message);
-            throw new FragmentInstanceDispatchException(
-                RpcUtils.getStatus(
-                    TSStatusCode.EXECUTE_STATEMENT_ERROR, 
sendFragmentInstanceResp.message));
+            if (sendFragmentInstanceResp.message.contains(
+                RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) {
+              throw new 
RatisReadUnavailableException(sendFragmentInstanceResp.message);
+            } else {
+              throw new FragmentInstanceDispatchException(
+                  RpcUtils.getStatus(
+                      TSStatusCode.EXECUTE_STATEMENT_ERROR, 
sendFragmentInstanceResp.message));
+            }
           }
           break;
         case WRITE:
@@ -342,9 +348,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                   TSStatusCode.EXECUTE_STATEMENT_ERROR,
                   String.format("unknown read type [%s]", 
instance.getType())));
       }
-    } catch (ClientManagerException | TException e) {
+    } catch (ClientManagerException | TException | 
RatisReadUnavailableException e) {
       logger.warn(
-          "can't connect to node {}, error msg is {}.",
+          "can't execute request on node {}, error msg is {}.",
           endPoint,
           ExceptionUtils.getRootCause(e).toString());
       TSStatus status = new TSStatus();

Reply via email to