This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3846bb7f2bc Enhance the robustness of Ratis linearizable reads for
node offline scenarios (#11954)
3846bb7f2bc is described below
commit 3846bb7f2bc4dc8c63d973fd7fb039f25f4e8774
Author: Potato <[email protected]>
AuthorDate: Tue Jan 23 15:54:57 2024 +0800
Enhance the robustness of Ratis linearizable reads for node offline
scenarios (#11954)
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../exception/RatisReadUnavailableException.java | 13 +++++++++----
.../org/apache/iotdb/consensus/ratis/RatisConsensus.java | 9 +++++++--
.../plan/scheduler/FragmentInstanceDispatcherImpl.java | 16 +++++++++++-----
3 files changed, 27 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
index d5ac76b93fe..f9505495d51 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java
@@ -22,10 +22,15 @@ package org.apache.iotdb.consensus.exception;
/** RaftServer is unable to serve linearizable read requests. */
public class RatisReadUnavailableException extends ConsensusException {
+ public static final String RATIS_READ_UNAVAILABLE =
+ "Raft Server cannot serve read requests now (leader is unknown or under
recovery). "
+ + "Please try read later: ";
+
public RatisReadUnavailableException(Throwable cause) {
- super(
- "Raft Server cannot serve read requests now (leader is unknown or
under recovery). "
- + "Please try read later: ",
- cause);
+ super(RATIS_READ_UNAVAILABLE, cause);
+ }
+
+ public RatisReadUnavailableException(String cause) {
+ super(cause);
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 950af3658e1..c627692b72d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -157,7 +157,12 @@ class RatisConsensus implements IConsensus {
this.ratisMetricSet = new RatisMetricSet();
this.readRetryPolicy =
RetryPolicy.<RaftClientReply>newBuilder()
- .setRetryHandler(c -> !c.isSuccess() && c.getException()
instanceof ReadIndexException)
+ .setRetryHandler(
+ c ->
+ !c.isSuccess()
+ && (c.getException() instanceof ReadIndexException
+ || c.getException() instanceof ReadException
+ || c.getException() instanceof NotLeaderException))
.setMaxAttempts(this.config.getImpl().getRetryTimesMax())
.setWaitTime(
TimeDuration.valueOf(
@@ -347,7 +352,7 @@ class RatisConsensus implements IConsensus {
if (canServeStaleRead != null && isLinearizableRead) {
canServeStaleRead.get(groupId).set(true);
}
- } catch (ReadException | ReadIndexException e) {
+ } catch (ReadException | ReadIndexException | NotLeaderException e) {
if (isLinearizableRead) {
// linearizable read failed. the RaftServer is recovering from Raft
Log and cannot serve
// read requests.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 08498bcb141..b48e8201629 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -299,9 +300,14 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
client.sendFragmentInstance(sendFragmentInstanceReq);
if (!sendFragmentInstanceResp.accepted) {
logger.warn(sendFragmentInstanceResp.message);
- throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR,
sendFragmentInstanceResp.message));
+ if (sendFragmentInstanceResp.message.contains(
+ RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) {
+ throw new
RatisReadUnavailableException(sendFragmentInstanceResp.message);
+ } else {
+ throw new FragmentInstanceDispatchException(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
sendFragmentInstanceResp.message));
+ }
}
break;
case WRITE:
@@ -342,9 +348,9 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format("unknown read type [%s]",
instance.getType())));
}
- } catch (ClientManagerException | TException e) {
+ } catch (ClientManagerException | TException |
RatisReadUnavailableException e) {
logger.warn(
- "can't connect to node {}, error msg is {}.",
+ "can't execute request on node {}, error msg is {}.",
endPoint,
ExceptionUtils.getRootCause(e).toString());
TSStatus status = new TSStatus();