This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch ratis_linearizable_read
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9b50cb5338983754ead5165ddb0d89779c619054
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Mon Feb 5 18:29:43 2024 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 .../execution/executor/RegionExecutionResult.java  |  9 ++++++++
 .../execution/executor/RegionReadExecutor.java     | 24 +++++++++++++++-------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  3 +--
 .../src/main/thrift/datanode.thrift                |  1 +
 5 files changed, 29 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3969338788b..5a62e1ddd6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -308,7 +308,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
     resp.setAccepted(executionResult.isAccepted());
     resp.setMessage(executionResult.getMessage());
-    // TODO
+    resp.setNeedRetry(executionResult.isNeedRetry());
     return resp;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
index 7541329a76f..2047bba61ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
@@ -28,6 +28,7 @@ public class RegionExecutionResult {
   private String message;
 
   private TSStatus status;
+  private boolean needRetry;
 
   public boolean isAccepted() {
     return accepted;
@@ -52,4 +53,12 @@ public class RegionExecutionResult {
   public void setStatus(TSStatus status) {
     this.status = status;
   }
+
+  public boolean isNeedRetry() {
+    return needRetry;
+  }
+
+  public void setNeedRetry(boolean needRetry) {
+    this.needRetry = needRetry;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 9ac38701dac..d57dfc9fe9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
@@ -32,6 +33,10 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
 import org.apache.iotdb.db.utils.SetThreadName;
 
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,14 +74,14 @@ public class RegionReadExecutor {
   public RegionExecutionResult execute(
       ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
     // execute fragment instance in state machine
-    DataSet readResponse;
+    RegionExecutionResult resp = new RegionExecutionResult();
     try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
+      DataSet readResponse;
       if (groupId instanceof DataRegionId) {
         readResponse = dataRegionConsensus.read(groupId, fragmentInstance);
       } else {
         readResponse = schemaRegionConsensus.read(groupId, fragmentInstance);
       }
-      RegionExecutionResult resp = new RegionExecutionResult();
       if (readResponse == null) {
         LOGGER.error(RESPONSE_NULL_ERROR_MSG);
         resp.setAccepted(false);
@@ -87,11 +92,16 @@ public class RegionReadExecutor {
         resp.setMessage(info.getMessage());
       }
       return resp;
-    } catch (Throwable t) {
-      LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, t);
-      RegionExecutionResult resp = new RegionExecutionResult();
-      resp.setAccepted(false);
-      resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
+    } catch (ConsensusException e) {
+      LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, e);
+      resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage()));
+      Throwable t = e.getCause();
+      if (t instanceof ReadException
+          || t instanceof ReadIndexException
+          || t instanceof NotLeaderException
+          || t instanceof ServerNotReadyException) {
+        resp.setNeedRetry(true);
+      }
       return resp;
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index c542bd09c8c..bde804c2fc5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -301,8 +301,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
             logger.warn(sendFragmentInstanceResp.message);
-            if (sendFragmentInstanceResp.message.contains(
-                RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) {
+            if (sendFragmentInstanceResp.needRetry) {
               throw new 
RatisReadUnavailableException(sendFragmentInstanceResp.message);
             } else {
               throw new FragmentInstanceDispatchException(
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 5af2ef4657d..8afc6bbe3e7 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -116,6 +116,7 @@ struct TSendFragmentInstanceReq {
 struct TSendFragmentInstanceResp {
   1: required bool accepted
   2: optional string message
+  3: optional bool needRetry
 }
 
 struct TSendSinglePlanNodeReq {

Reply via email to