This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch ratis_linearizable_read in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9b50cb5338983754ead5165ddb0d89779c619054 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Mon Feb 5 18:29:43 2024 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../impl/DataNodeInternalRPCServiceImpl.java | 2 +- .../execution/executor/RegionExecutionResult.java | 9 ++++++++ .../execution/executor/RegionReadExecutor.java | 24 +++++++++++++++------- .../scheduler/FragmentInstanceDispatcherImpl.java | 3 +-- .../src/main/thrift/datanode.thrift | 1 + 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 3969338788b..5a62e1ddd6a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -308,7 +308,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(); resp.setAccepted(executionResult.isAccepted()); resp.setMessage(executionResult.getMessage()); - // TODO + resp.setNeedRetry(executionResult.isNeedRetry()); return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java index 7541329a76f..2047bba61ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionExecutionResult.java @@ -28,6 +28,7 @@ public class RegionExecutionResult { private String message; private TSStatus status; + private boolean needRetry; public boolean isAccepted() { return accepted; @@ -52,4 +53,12 @@ public class RegionExecutionResult { public void setStatus(TSStatus status) { this.status = status; } + + public boolean isNeedRetry() { + return needRetry; + } + + public void setNeedRetry(boolean needRetry) { + this.needRetry = needRetry; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java index 9ac38701dac..d57dfc9fe9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.DataSet; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo; @@ -32,6 +33,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion; import org.apache.iotdb.db.utils.SetThreadName; +import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.apache.ratis.protocol.exceptions.ReadException; +import org.apache.ratis.protocol.exceptions.ReadIndexException; +import org.apache.ratis.protocol.exceptions.ServerNotReadyException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,14 +74,14 @@ public class RegionReadExecutor { public RegionExecutionResult execute( ConsensusGroupId groupId, FragmentInstance fragmentInstance) { // execute fragment instance in state machine - DataSet readResponse; + RegionExecutionResult resp = new RegionExecutionResult(); try (SetThreadName threadName = new SetThreadName(fragmentInstance.getId().getFullId())) { + DataSet readResponse; if (groupId instanceof DataRegionId) { readResponse = dataRegionConsensus.read(groupId, fragmentInstance); } else { readResponse = schemaRegionConsensus.read(groupId, fragmentInstance); } - RegionExecutionResult resp = new RegionExecutionResult(); if (readResponse == null) { LOGGER.error(RESPONSE_NULL_ERROR_MSG); resp.setAccepted(false); @@ -87,11 +92,16 @@ public class RegionReadExecutor { resp.setMessage(info.getMessage()); } return resp; - } catch (Throwable t) { - LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", groupId, t); - RegionExecutionResult resp = new RegionExecutionResult(); - resp.setAccepted(false); - resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage())); + } catch (ConsensusException e) { + LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", groupId, e); + resp.setMessage(String.format(ERROR_MSG_FORMAT, e.getMessage())); + Throwable t = e.getCause(); + if (t instanceof ReadException + || t instanceof ReadIndexException + || t instanceof NotLeaderException + || t instanceof ServerNotReadyException) { + resp.setNeedRetry(true); + } return resp; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index c542bd09c8c..bde804c2fc5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -301,8 +301,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { client.sendFragmentInstance(sendFragmentInstanceReq); if (!sendFragmentInstanceResp.accepted) { logger.warn(sendFragmentInstanceResp.message); - if (sendFragmentInstanceResp.message.contains( - RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) { + if (sendFragmentInstanceResp.needRetry) { throw new RatisReadUnavailableException(sendFragmentInstanceResp.message); } else { throw new FragmentInstanceDispatchException( diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 5af2ef4657d..8afc6bbe3e7 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -116,6 +116,7 @@ struct TSendFragmentInstanceReq { struct TSendFragmentInstanceResp { 1: required bool accepted 2: optional string message + 3: optional bool needRetry } struct TSendSinglePlanNodeReq {
