This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e5605ba11a Fixed Ratis query not retrying when DataNode restarts 
(#12029)
6e5605ba11a is described below

commit 6e5605ba11aed842aacf6069bac4842dc787283d
Author: Potato <[email protected]>
AuthorDate: Tue Feb 6 07:47:13 2024 +0800

    Fixed Ratis query not retrying when DataNode restarts (#12029)
---
 .../exception/RatisReadUnavailableException.java   |  2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 .../execution/executor/RegionExecutionResult.java  |  9 +++++++++
 .../execution/executor/RegionReadExecutor.java     | 23 +++++++++++++++-------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  4 ++--
 .../src/main/thrift/datanode.thrift                |  1 +
 6 files changed, 30 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
index f9505495d51..1ef5506942c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.consensus.exception;
 /** RaftServer is unable to serve linearizable read requests. */
 public class RatisReadUnavailableException extends ConsensusException {
 
-  public static final String RATIS_READ_UNAVAILABLE =
+  private static final String RATIS_READ_UNAVAILABLE =
       "Raft Server cannot serve read requests now (leader is unknown or under 
recovery). "
           + "Please try read later: ";
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3969338788b..5a62e1ddd6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -308,7 +308,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
     resp.setAccepted(executionResult.isAccepted());
     resp.setMessage(executionResult.getMessage());
-    // TODO
+    resp.setNeedRetry(executionResult.isNeedRetry());
     return resp;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
index 7541329a76f..2047bba61ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
@@ -28,6 +28,7 @@ public class RegionExecutionResult {
   private String message;
 
   private TSStatus status;
+  private boolean needRetry;
 
   public boolean isAccepted() {
     return accepted;
@@ -52,4 +53,12 @@ public class RegionExecutionResult {
   public void setStatus(TSStatus status) {
     this.status = status;
   }
+
+  public boolean isNeedRetry() {
+    return needRetry;
+  }
+
+  public void setNeedRetry(boolean needRetry) {
+    this.needRetry = needRetry;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 9ac38701dac..37c6639fb2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -32,6 +32,10 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
 import org.apache.iotdb.db.utils.SetThreadName;
 
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,14 +73,14 @@ public class RegionReadExecutor {
   public RegionExecutionResult execute(
       ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
     // execute fragment instance in state machine
-    DataSet readResponse;
+    RegionExecutionResult resp = new RegionExecutionResult();
     try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
+      DataSet readResponse;
       if (groupId instanceof DataRegionId) {
         readResponse = dataRegionConsensus.read(groupId, fragmentInstance);
       } else {
         readResponse = schemaRegionConsensus.read(groupId, fragmentInstance);
       }
-      RegionExecutionResult resp = new RegionExecutionResult();
       if (readResponse == null) {
         LOGGER.error(RESPONSE_NULL_ERROR_MSG);
         resp.setAccepted(false);
@@ -87,11 +91,16 @@ public class RegionReadExecutor {
         resp.setMessage(info.getMessage());
       }
       return resp;
-    } catch (Throwable t) {
-      LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, t);
-      RegionExecutionResult resp = new RegionExecutionResult();
-      resp.setAccepted(false);
-      resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
+    } catch (Throwable e) {
+      LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, e);
+      resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage()));
+      Throwable t = e.getCause();
+      if (t instanceof ReadException
+          || t instanceof ReadIndexException
+          || t instanceof NotLeaderException
+          || t instanceof ServerNotReadyException) {
+        resp.setNeedRetry(true);
+      }
       return resp;
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index c542bd09c8c..231118cc1f3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -301,8 +301,8 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
             logger.warn(sendFragmentInstanceResp.message);
-            if (sendFragmentInstanceResp.message.contains(
-                RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) {
+            if (sendFragmentInstanceResp.isSetNeedRetry()
+                && sendFragmentInstanceResp.isNeedRetry()) {
               throw new 
RatisReadUnavailableException(sendFragmentInstanceResp.message);
             } else {
               throw new FragmentInstanceDispatchException(
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 5af2ef4657d..8afc6bbe3e7 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -116,6 +116,7 @@ struct TSendFragmentInstanceReq {
 struct TSendFragmentInstanceResp {
   1: required bool accepted
   2: optional string message
+  3: optional bool needRetry
 }
 
 struct TSendSinglePlanNodeReq {

Reply via email to