This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6e5605ba11a Fixed Ratis query not retrying when DataNode restarts
(#12029)
6e5605ba11a is described below
commit 6e5605ba11aed842aacf6069bac4842dc787283d
Author: Potato <[email protected]>
AuthorDate: Tue Feb 6 07:47:13 2024 +0800
Fixed Ratis query not retrying when DataNode restarts (#12029)
---
.../exception/RatisReadUnavailableException.java | 2 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../execution/executor/RegionExecutionResult.java | 9 +++++++++
.../execution/executor/RegionReadExecutor.java | 23 +++++++++++++++-------
.../scheduler/FragmentInstanceDispatcherImpl.java | 4 ++--
.../src/main/thrift/datanode.thrift | 1 +
6 files changed, 30 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
index f9505495d51..1ef5506942c 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.consensus.exception;
/** RaftServer is unable to serve linearizable read requests. */
public class RatisReadUnavailableException extends ConsensusException {
- public static final String RATIS_READ_UNAVAILABLE =
+ private static final String RATIS_READ_UNAVAILABLE =
"Raft Server cannot serve read requests now (leader is unknown or under
recovery). "
+ "Please try read later: ";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3969338788b..5a62e1ddd6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -308,7 +308,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
resp.setAccepted(executionResult.isAccepted());
resp.setMessage(executionResult.getMessage());
- // TODO
+ resp.setNeedRetry(executionResult.isNeedRetry());
return resp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
index 7541329a76f..2047bba61ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java
@@ -28,6 +28,7 @@ public class RegionExecutionResult {
private String message;
private TSStatus status;
+ private boolean needRetry;
public boolean isAccepted() {
return accepted;
@@ -52,4 +53,12 @@ public class RegionExecutionResult {
public void setStatus(TSStatus status) {
this.status = status;
}
+
+ public boolean isNeedRetry() {
+ return needRetry;
+ }
+
+ public void setNeedRetry(boolean needRetry) {
+ this.needRetry = needRetry;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 9ac38701dac..37c6639fb2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -32,6 +32,10 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,14 +73,14 @@ public class RegionReadExecutor {
public RegionExecutionResult execute(
ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
// execute fragment instance in state machine
- DataSet readResponse;
+ RegionExecutionResult resp = new RegionExecutionResult();
try (SetThreadName threadName = new
SetThreadName(fragmentInstance.getId().getFullId())) {
+ DataSet readResponse;
if (groupId instanceof DataRegionId) {
readResponse = dataRegionConsensus.read(groupId, fragmentInstance);
} else {
readResponse = schemaRegionConsensus.read(groupId, fragmentInstance);
}
- RegionExecutionResult resp = new RegionExecutionResult();
if (readResponse == null) {
LOGGER.error(RESPONSE_NULL_ERROR_MSG);
resp.setAccepted(false);
@@ -87,11 +91,16 @@ public class RegionReadExecutor {
resp.setMessage(info.getMessage());
}
return resp;
- } catch (Throwable t) {
- LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, t);
- RegionExecutionResult resp = new RegionExecutionResult();
- resp.setAccepted(false);
- resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
+ } catch (Throwable e) {
+ LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, e);
+ resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage()));
+ Throwable t = e.getCause();
+ if (t instanceof ReadException
+ || t instanceof ReadIndexException
+ || t instanceof NotLeaderException
+ || t instanceof ServerNotReadyException) {
+ resp.setNeedRetry(true);
+ }
return resp;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index c542bd09c8c..231118cc1f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -301,8 +301,8 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
client.sendFragmentInstance(sendFragmentInstanceReq);
if (!sendFragmentInstanceResp.accepted) {
logger.warn(sendFragmentInstanceResp.message);
- if (sendFragmentInstanceResp.message.contains(
- RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) {
+ if (sendFragmentInstanceResp.isSetNeedRetry()
+ && sendFragmentInstanceResp.isNeedRetry()) {
throw new
RatisReadUnavailableException(sendFragmentInstanceResp.message);
} else {
throw new FragmentInstanceDispatchException(
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 5af2ef4657d..8afc6bbe3e7 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -116,6 +116,7 @@ struct TSendFragmentInstanceReq {
struct TSendFragmentInstanceResp {
1: required bool accepted
2: optional string message
+ 3: optional bool needRetry
}
struct TSendSinglePlanNodeReq {