This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch consensus_module_refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1608c75beb77c755a703e2ec6b6c2f7fbc7cb775 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Aug 16 15:05:07 2023 +0800 init Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../manager/consensus/ConsensusManager.java | 6 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 2 +- .../org/apache/iotdb/consensus/IConsensus.java | 128 +++++------ .../exception/RatisUnderRecoveryException.java | 2 +- .../apache/iotdb/consensus/iot/IoTConsensus.java | 155 +++++--------- .../iotdb/consensus/ratis/RatisConsensus.java | 234 +++++++++------------ .../iotdb/consensus/simple/SimpleConsensus.java | 140 ++++++------ ...verImpl.java => SimpleConsensusServerImpl.java} | 4 +- .../apache/iotdb/consensus/iot/ReplicateTest.java | 12 +- .../apache/iotdb/consensus/iot/StabilityTest.java | 16 +- .../iotdb/consensus/ratis/RatisConsensusTest.java | 60 +++--- .../iotdb/consensus/ratis/RecoverReadTest.java | 8 +- .../iotdb/consensus/simple/RecoveryTest.java | 4 +- .../consensus/simple/SimpleConsensusTest.java | 35 ++- .../impl/DataNodeInternalRPCServiceImpl.java | 8 +- .../thrift/impl/DataNodeRegionManager.java | 4 +- .../execution/executor/RegionWriteExecutor.java | 94 ++++----- .../iotdb/db/service/RegionMigrateService.java | 12 +- .../DataNodeInternalRPCServiceImplTest.java | 4 +- .../apache/iotdb/commons/utils/StatusUtils.java | 2 +- 20 files changed, 403 insertions(+), 527 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 740390d902d..95182ce9ab1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -250,7 +250,7 @@ public class ConsensusManager { configNodeLocation.getConfigNodeId(), configNodeLocation.getConsensusEndPoint())); } - consensusImpl.createPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList); + consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList); } /** @@ -262,7 +262,7 @@ public class ConsensusManager { public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException { boolean result = consensusImpl - .addPeer( + .addRemotePeer( DEFAULT_CONSENSUS_GROUP_ID, new Peer( DEFAULT_CONSENSUS_GROUP_ID, @@ -284,7 +284,7 @@ public class ConsensusManager { */ public boolean removeConfigNodePeer(TConfigNodeLocation configNodeLocation) { return consensusImpl - .removePeer( + .removeRemotePeer( DEFAULT_CONSENSUS_GROUP_ID, new Peer( DEFAULT_CONSENSUS_GROUP_ID, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 8465fdc6c80..4d352cf59db 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -625,7 +625,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac ConsensusGroupId groupId = configManager.getConsensusManager().getConsensusGroupId(); ConsensusGenericResponse resp = - configManager.getConsensusManager().getConsensusImpl().deletePeer(groupId); + configManager.getConsensusManager().getConsensusImpl().deleteLocalPeer(groupId); if (!resp.isSuccess()) { return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()) .setMessage( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index 8a101e721e5..1f51c09c743 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -19,12 +19,12 @@ package org.apache.iotdb.consensus; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.exception.ConsensusException; import javax.annotation.concurrent.ThreadSafe; @@ -32,120 +32,96 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.util.List; -/** Consensus module base class. */ +/** + * Consensus module base interface. + */ @ThreadSafe public interface IConsensus { + /** + * Start the consensus module + */ void start() throws IOException; + /** + * Stop the consensus module + */ void stop() throws IOException; - // write API - ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request); + /** + * Write data to the corresponding consensus group + * + * @param groupId the consensus group this request belongs + * @param request write request + */ + TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) throws ConsensusException; - // read API - ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request); + /** + * Read data from the corresponding consensus group + * + * @param groupId the consensus group this request belongs + * @param request read request + */ + DataSet read(ConsensusGroupId groupId, IConsensusRequest request) throws ConsensusException; // multi consensus group API /** * Require the <em>local node</em> to create a Peer and become a member of the given consensus * group. This node will prepare and initialize local statemachine {@link IStateMachine} and other - * data structures. After this method returns, we can call {@link #addPeer(ConsensusGroupId, - * Peer)} to notify original group that this new Peer is prepared to be added into the latest - * configuration. createPeer should be called on a node that does not contain any peer of the - * consensus group, to avoid one node having more than one replica. + * data structures. After this method returns, we can call + * {@link #addRemotePeer(ConsensusGroupId, Peer)} to notify original group that this new Peer is + * prepared to be added into the latest configuration. createLocalPeer should be called on a node + * that does not contain any peer of the consensus group, to avoid one node having more than one + * replica. * * @param groupId the consensus group this Peer belongs - * @param peers other known peers in this group + * @param peers other known peers in this group */ - ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers); + void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) throws ConsensusException; /** * When the <em>local node</em> is no longer a member of the given consensus group, call this * method to do cleanup works. This method will close local statemachine {@link IStateMachine}, - * delete local data and do other cleanup works. Be sure this method is called after successfully - * removing this peer from current consensus group configuration (by calling {@link - * #removePeer(ConsensusGroupId, Peer)} or {@link #changePeer(ConsensusGroupId, List)}). + * delete local data and do other cleanup works. deleteLocalPeer should be called after + * successfully removing this peer from current consensus group configuration (by calling + * {@link #removeRemotePeer(ConsensusGroupId, Peer)}). * * @param groupId the consensus group this Peer used to belong */ - ConsensusGenericResponse deletePeer(ConsensusGroupId groupId); + void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException; // single consensus group API /** - * Tell the group that a new Peer is prepared to be added into this group. Call {@link - * #createPeer(ConsensusGroupId, List)} on the new Peer before calling this method. When this - * method returns, the group data should be already transmitted to the new Peer. That is, the new - * peer is available to answer client requests by the time this method successfully returns. - * addPeer should be called on a living peer of the consensus group. For example: We'd like to add - * a peer D to (A, B, C) group. We need to execute addPeer in A, B or C. + * Tell the group that a new Peer is prepared to be added into this group. Call + * {@link #createLocalPeer(ConsensusGroupId, List)} on the new Peer before calling this method. + * When this method returns, the group data should be already transmitted to the new Peer. That + * is, the new peer is available to answer client requests by the time this method successfully + * returns. addRemotePeer should be called on a living peer of the consensus group. For example: + * We'd like to add a peer D to (A, B, C) group. We need to execute addPeer in A, B or C. * * @param groupId the consensus group this peer belongs - * @param peer the newly added peer + * @param peer the newly added peer */ - ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer); + void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException; /** * Tell the group to remove an active Peer. The removed peer can no longer answer group requests - * when this method successfully returns. Call {@link #deletePeer(ConsensusGroupId)} on the - * removed Peer to do cleanup jobs after this method successfully returns. removePeer should be - * called on a living peer of its consensus group. For example: a group has A, B, C. We'd like to - * remove C, in case C is dead, the removePeer should be sent to A or B. + * when this method successfully returns. Call {@link #deleteLocalPeer(ConsensusGroupId)} on the + * removed Peer to do cleanup jobs after this method successfully returns. removeRemotePeer should + * be called on a living peer of its consensus group. For example: a group has A, B, C. We'd like + * to remove C, in case C is dead, the removePeer should be sent to A or B. * * @param groupId the consensus group this peer belongs - * @param peer the peer to be removed - */ - ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer); - - /** - * Tell the group to update an active Peer. The modifiable part of {@link Peer} is TEndPoint. - * - * @param groupId the consensus group this peer belongs - * @param oldPeer the peer to be updated - * @param newPeer the peer to be updated to - */ - // TODO: @SzyWilliam @SpriCoder Please implement this method - ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer); - - /** - * Change group configuration. This method allows you to add/remove multiple Peers at once. This - * method is similar to {@link #addPeer(ConsensusGroupId, Peer)} or {@link - * #removePeer(ConsensusGroupId, Peer)} - * - * @param groupId the consensus group - * @param newPeers the new member configuration of this group - */ - ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers); - - /** - * Tell the group to [create a new Peer on new node] and [add this member to join the group]. - * - * <p>The underlying implementation should <br> - * 1. first call createPeer on the new member <br> - * 2. then call addPeer for configuration change <br> - * Call this method on any node of the <em>original group</em>. <br> - * NOTICE: Currently only RatisConsensus implements this method. - * - * @param groupId the consensus group - * @param newNode the new member - * @param originalGroup the original members of the existed group + * @param peer the peer to be removed */ - default ConsensusGenericResponse addNewNodeToExistedGroup( - ConsensusGroupId groupId, Peer newNode, List<Peer> originalGroup) { - return ConsensusGenericResponse.newBuilder() - .setSuccess(false) - .setException( - new ConsensusException( - "addNewNodeToExistedGroup method is not implemented by " + this + " class")) - .build(); - } + void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException; // management API - ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader); + void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException; - ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId); + void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException; boolean isLeader(ConsensusGroupId groupId); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java index 762b7ea432c..f614c2bd3f0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java @@ -24,7 +24,7 @@ public class RatisUnderRecoveryException extends ConsensusException { public RatisUnderRecoveryException(Throwable cause) { super( - "Ratis Server is redoing Raft Log and cannot serve read requests now. Please try read later: " + "Raft Server is redoing Raft Log and cannot serve read requests now. Please try read later: " + cause, cause); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 047d557ad55..aee5a970ec3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -19,16 +19,20 @@ package org.apache.iotdb.consensus.iot; +import java.util.Optional; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.IStateMachine.Registry; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; @@ -42,6 +46,7 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; import org.apache.iotdb.consensus.exception.IllegalPeerNumException; +import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient; import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.AsyncIoTConsensusServiceClientPoolFactory; import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory; @@ -49,9 +54,11 @@ import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient; import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager; import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService; import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor; +import org.apache.iotdb.consensus.simple.SimpleConsensusServerImpl; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.ratis.protocol.GroupManagementRequest.Op; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +112,7 @@ public class IoTConsensus implements IConsensus { } @Override - public void start() throws IOException { + public synchronized void start() throws IOException { initAndRecover(); service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this)); try { @@ -118,7 +125,7 @@ public class IoTConsensus implements IConsensus { private void initAndRecover() throws IOException { if (!storageDir.exists()) { if (!storageDir.mkdirs()) { - logger.warn("Unable to create consensus dir at {}", storageDir); + throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); } } else { try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) { @@ -144,7 +151,7 @@ public class IoTConsensus implements IConsensus { } @Override - public void stop() { + public synchronized void stop() { stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop); clientManager.close(); syncClientManager.close(); @@ -152,62 +159,52 @@ public class IoTConsensus implements IConsensus { } @Override - public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) { - IoTConsensusServerImpl impl = stateMachineMap.get(groupId); - if (impl == null) { - return ConsensusWriteResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); - } - - TSStatus status; + public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) + throws ConsensusException { + IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); if (impl.isReadOnly()) { - status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); - status.setMessage("Fail to do non-query operations because system is read-only."); + return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY); } else if (!impl.isActive()) { - // TODO: (xingtanzjr) whether we need to define a new status to indicate the inactive status ? - status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT); - status.setMessage("peer is inactive and not ready to receive sync log request."); + return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, + "peer is inactive and not ready to receive sync log request."); } else { - status = impl.write(request); + return impl.write(request); } - return ConsensusWriteResponse.newBuilder().setStatus(status).build(); } @Override - public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) { - IoTConsensusServerImpl impl = stateMachineMap.get(groupId); - if (impl == null) { - return ConsensusReadResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); - } - return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build(); + public DataSet read(ConsensusGroupId groupId, IConsensusRequest request) + throws ConsensusException { + return Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)) + .read(request); } + @SuppressWarnings("java:S2201") @Override - public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) { + public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) + throws ConsensusException { int consensusGroupSize = peers.size(); if (consensusGroupSize == 0) { - return ConsensusGenericResponse.newBuilder() - .setException(new IllegalPeerNumException(consensusGroupSize)) - .build(); + throw new IllegalPeerNumException(consensusGroupSize); } if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) { - return ConsensusGenericResponse.newBuilder() - .setException(new IllegalPeerEndpointException(thisNode, peers)) - .build(); + throw new IllegalPeerEndpointException(thisNode, peers); } AtomicBoolean exist = new AtomicBoolean(true); - stateMachineMap.computeIfAbsent( + Optional.ofNullable(stateMachineMap.computeIfAbsent( groupId, k -> { exist.set(false); + String path = buildPeerDir(storageDir, groupId); File file = new File(path); if (!file.mkdirs()) { logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); + return null; } + IoTConsensusServerImpl impl = new IoTConsensusServerImpl( path, @@ -219,17 +216,15 @@ public class IoTConsensus implements IConsensus { config); impl.start(); return impl; - }); + })).orElseThrow(() -> new ConsensusException( + String.format("Unable to create consensus dir for group %s", groupId))); if (exist.get()) { - return ConsensusGenericResponse.newBuilder() - .setException(new ConsensusGroupAlreadyExistException(groupId)) - .build(); + throw new ConsensusGroupAlreadyExistException(groupId); } - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } @Override - public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) { + public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException { AtomicBoolean exist = new AtomicBoolean(false); stateMachineMap.computeIfPresent( groupId, @@ -239,22 +234,17 @@ public class IoTConsensus implements IConsensus { FileUtils.deleteDirectory(new File(buildPeerDir(storageDir, groupId))); return null; }); - if (!exist.get()) { - return ConsensusGenericResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); + throw new ConsensusGroupNotExistException(groupId); } - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } @Override - public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) { - IoTConsensusServerImpl impl = stateMachineMap.get(groupId); - if (impl == null) { - return ConsensusGenericResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); + public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException { + IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + if (impl.getConfiguration().contains(peer)) { + throw new PeerAlreadyInConsensusGroupException(groupId, peer); } try { // step 1: inactive new Peer to prepare for following steps @@ -287,14 +277,9 @@ public class IoTConsensus implements IConsensus { doSpotClean(peer, impl); } catch (ConsensusGroupModifyPeerException e) { - logger.error("cannot execute addPeer() for {}", peer, e); - return ConsensusGenericResponse.newBuilder() - .setSuccess(false) - .setException(new ConsensusException(e.getMessage())) - .build(); + logger.error(String.format("cannot execute addPeer() for %s", peer), e); + throw new ConsensusException(e.getMessage()); } - - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } private void doSpotClean(Peer peer, IoTConsensusServerImpl impl) { @@ -306,21 +291,16 @@ public class IoTConsensus implements IConsensus { } @Override - public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) { - IoTConsensusServerImpl impl = stateMachineMap.get(groupId); - if (impl == null) { - return ConsensusGenericResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); - } + public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) + throws ConsensusException { + IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + try { // let other peers remove the sync channel with target peer impl.notifyPeersToRemoveSyncLogChannel(peer); } catch (ConsensusGroupModifyPeerException e) { - return ConsensusGenericResponse.newBuilder() - .setSuccess(false) - .setException(new ConsensusException(e.getMessage())) - .build(); + throw new ConsensusException(e.getMessage()); } try { @@ -330,44 +310,25 @@ public class IoTConsensus implements IConsensus { impl.waitTargetPeerUntilSyncLogCompleted(peer); } catch (ConsensusGroupModifyPeerException e) { // we only log warning here because sometimes the target peer may already be down - logger.warn("cannot wait {} to complete SyncLog. error message: {}", peer, e.getMessage()); + logger.warn(String.format("cannot wait %s to complete SyncLog.", peer), e); + throw new ConsensusException(e.getMessage()); } - - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); - } - - @Override - public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) { - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } @Override - public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); + public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException { + throw new ConsensusException("IoTConsensus does not support leader transfer"); } @Override - public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) { - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); - } - - @Override - public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) { - IoTConsensusServerImpl impl = stateMachineMap.get(groupId); - if (impl == null) { - return ConsensusGenericResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); - } + public void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException { + IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); try { impl.takeSnapshot(); } catch (ConsensusGroupModifyPeerException e) { - return ConsensusGenericResponse.newBuilder() - .setSuccess(false) - .setException(new ConsensusException(e.getMessage())) - .build(); + throw new ConsensusException(e.getMessage()); } - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } @Override diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 27c8bb0d271..6d3d1e5567c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -19,6 +19,7 @@ package org.apache.iotdb.consensus.ratis; +import java.util.Optional; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -45,6 +46,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.RatisConfig; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.NodeReadOnlyException; import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; @@ -73,8 +75,12 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.SnapshotManagementRequest; +import org.apache.ratis.protocol.exceptions.AlreadyExistsException; +import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.protocol.exceptions.RaftException; +import org.apache.ratis.protocol.exceptions.ReadException; +import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.protocol.exceptions.ResourceUnavailableException; import org.apache.ratis.server.DivisionInfo; import org.apache.ratis.server.RaftServer; @@ -97,12 +103,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -/** A multi-raft consensus implementation based on Apache Ratis. */ +/** + * A multi-raft consensus implementation based on Apache Ratis. + */ class RatisConsensus implements IConsensus { private static final Logger logger = LoggerFactory.getLogger(RatisConsensus.class); - /** the unique net communication endpoint */ + /** + * the unique net communication endpoint + */ private final RaftPeer myself; private final RaftServer server; @@ -110,7 +120,7 @@ class RatisConsensus implements IConsensus { private final RaftProperties properties = new RaftProperties(); private final RaftClientRpc clientRpc; - private final IClientManager<RaftGroup, RatisClient> clientManager; + private final IClientManager<RaftGroup, org.apache.iotdb.consensus.ratis.RatisClient> clientManager; private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>(); @@ -120,7 +130,6 @@ class RatisConsensus implements IConsensus { private static final int DEFAULT_PRIORITY = 0; private static final int LEADER_PRIORITY = 1; - /** TODO make it configurable */ private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); private final ExecutorService addExecutor; @@ -159,7 +168,7 @@ class RatisConsensus implements IConsensus { ThreadName.RATIS_BG_DISK_GUARDIAN.getName()); clientManager = - new IClientManager.Factory<RaftGroup, RatisClient>() + new IClientManager.Factory<RaftGroup, org.apache.iotdb.consensus.ratis.RatisClient>() .createClientManager(new RatisClientPoolFactory()); clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties); @@ -170,7 +179,7 @@ class RatisConsensus implements IConsensus { .setProperties(properties) .setStateMachineRegistry( raftGroupId -> - new ApplicationStateMachineProxy( + new org.apache.iotdb.consensus.ratis.ApplicationStateMachineProxy( registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)), raftGroupId, canServeStaleRead)) @@ -178,14 +187,14 @@ class RatisConsensus implements IConsensus { } @Override - public void start() throws IOException { + public synchronized void start() throws IOException { MetricService.getInstance().addMetricSet(this.ratisMetricSet); server.start(); startSnapshotGuardian(); } @Override - public void stop() throws IOException { + public synchronized void stop() throws IOException { addExecutor.shutdown(); diskGuardian.shutdown(); try { @@ -206,7 +215,9 @@ class RatisConsensus implements IConsensus { return !reply.isSuccess() && (reply.getException() instanceof ResourceUnavailableException); } - /** launch a consensus write with retry mechanism */ + /** + * launch a consensus write with retry mechanism + */ private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, IOException> caller) throws IOException { @@ -245,120 +256,125 @@ class RatisConsensus implements IConsensus { return writeWithRetry(() -> server.submitClientRequest(request)); } - private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message message) + private RaftClientReply writeRemotelyWithRetry( + org.apache.iotdb.consensus.ratis.RatisClient client, Message message) throws IOException { return writeWithRetry(() -> client.getRaftClient().io().send(message)); } /** - * write will first send request to local server use method call if local server is not leader, it - * will use RaftClient to send RPC to read leader + * write will first send request to local server using local method call. If local server is not + * leader, it will use RaftClient to send RPC to read leader */ @Override - public ConsensusWriteResponse write( - ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) { + public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) + throws ConsensusException { // pre-condition: group exists and myself server serves this group - RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId); + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); RaftGroup raftGroup = getGroupInfo(raftGroupId); if (raftGroup == null || !raftGroup.getPeers().contains(myself)) { - return failedWrite(new ConsensusGroupNotExistException(consensusGroupId)); + throw new ConsensusGroupNotExistException(groupId); } // current Peer is group leader and in ReadOnly State - if (isLeader(consensusGroupId) && Utils.rejectWrite()) { + if (isLeader(groupId) && Utils.rejectWrite()) { try { forceStepDownLeader(raftGroup); } catch (Exception e) { logger.warn("leader {} read only, force step down failed due to {}", myself, e); } - return failedWrite(new NodeReadOnlyException(myself)); + throw new NodeReadOnlyException(myself); } // serialize request into Message - Message message = new RequestMessage(IConsensusRequest); + Message message = new RequestMessage(request); // 1. first try the local server RaftClientRequest clientRequest = buildRawRequest(raftGroupId, message, RaftClientRequest.writeRequestType()); - RaftClientReply localServerReply; RaftPeer suggestedLeader = null; - if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) { + if (isLeader(groupId) && waitUntilLeaderReady(raftGroupId)) { try (AutoCloseable ignored = RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) { - localServerReply = writeLocallyWithRetry(clientRequest); + RaftClientReply localServerReply = writeLocallyWithRetry(clientRequest); if (localServerReply.isSuccess()) { - ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage(); - TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder(); - return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build(); + org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage = (org.apache.iotdb.consensus.ratis.ResponseMessage) localServerReply.getMessage(); + return Optional.ofNullable(responseMessage.getContentHolder()).map(TSStatus.class::cast) + .orElse(null); } NotLeaderException ex = localServerReply.getNotLeaderException(); - if (ex != null) { // local server is not leader + if (ex != null) { suggestedLeader = ex.getSuggestedLeader(); } } catch (Exception e) { - return failedWrite(new RatisRequestFailedException(e)); + throw new RatisRequestFailedException(e); } } // 2. try raft client TSStatus writeResult; try (AutoCloseable ignored = - RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType); - RatisClient client = getRaftClient(raftGroup)) { + RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType); + org.apache.iotdb.consensus.ratis.RatisClient client = getRaftClient(raftGroup)) { RaftClientReply reply = writeRemotelyWithRetry(client, message); if (!reply.isSuccess()) { - return failedWrite(new RatisRequestFailedException(reply.getException())); + throw new RatisRequestFailedException(reply.getException()); } writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer()); } catch (Exception e) { - return failedWrite(new RatisRequestFailedException(e)); + throw new RatisRequestFailedException(e); } if (suggestedLeader != null) { TEndPoint leaderEndPoint = Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress()); writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), leaderEndPoint.getPort())); } - return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build(); + return writeResult; } - /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */ + /** + * Read directly from LOCAL COPY notice: + */ @Override - public ConsensusReadResponse read( - ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) { - RaftGroupId groupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId); - RaftGroup group = getGroupInfo(groupId); + public DataSet read(ConsensusGroupId groupId, IConsensusRequest request) + throws ConsensusException { + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); + RaftGroup group = getGroupInfo(raftGroupId); if (group == null || !group.getPeers().contains(myself)) { - return failedRead(new ConsensusGroupNotExistException(consensusGroupId)); + throw new ConsensusGroupNotExistException(groupId); } final boolean isLinearizableRead = - !canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new AtomicBoolean(false)).get(); + !canServeStaleRead.computeIfAbsent(groupId, id -> new AtomicBoolean(false)).get(); RaftClientReply reply; try { - reply = doRead(groupId, IConsensusRequest, isLinearizableRead); + reply = doRead(raftGroupId, request, isLinearizableRead); // allow stale read if current linearizable read returns successfully if (isLinearizableRead) { - canServeStaleRead.get(consensusGroupId).set(true); + canServeStaleRead.get(groupId).set(true); } - } catch (Exception e) { + } catch (ReadException | ReadIndexException e) { if (isLinearizableRead) { // linearizable read failed. the RaftServer is recovering from Raft Log and cannot serve // read requests. - return failedRead(new RatisUnderRecoveryException(e)); + throw new RatisUnderRecoveryException(e); } else { - return failedRead(new RatisRequestFailedException(e)); + throw new RatisRequestFailedException(e); } + } catch (Exception e) { + throw new RatisRequestFailedException(e); } - Message ret = reply.getMessage(); - ResponseMessage readResponseMessage = (ResponseMessage) ret; - DataSet dataSet = (DataSet) readResponseMessage.getContentHolder(); - return ConsensusReadResponse.newBuilder().setDataSet(dataSet).build(); + org.apache.iotdb.consensus.ratis.ResponseMessage readResponseMessage = (org.apache.iotdb.consensus.ratis.ResponseMessage) ret; + return Optional.ofNullable(readResponseMessage.getContentHolder()).map(DataSet.class::cast) + .orElse(null); } - /** return a success raft client reply or throw an Exception */ + /** + * return a success raft client reply or throw an Exception + */ private RaftClientReply doRead( RaftGroupId gid, IConsensusRequest readRequest, boolean linearizable) throws Exception { final RaftClientRequest.Type readType = @@ -390,25 +406,22 @@ class RatisConsensus implements IConsensus { * register self to the RaftGroup */ @Override - public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) { + public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) + throws ConsensusException { RaftGroup group = buildRaftGroup(groupId, peers); - // add RaftPeer myself to this RaftGroup - return addNewGroupToServer(group, myself); - } - - private ConsensusGenericResponse addNewGroupToServer(RaftGroup group, RaftPeer server) { - RaftClientReply reply; RaftGroup clientGroup = - group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), server) : group; - try (RatisClient client = getRaftClient(clientGroup)) { - reply = client.getRaftClient().getGroupManagementApi(server.getId()).add(group); + group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), myself) : group; + try (org.apache.iotdb.consensus.ratis.RatisClient client = getRaftClient(clientGroup)) { + RaftClientReply reply = client.getRaftClient().getGroupManagementApi(myself.getId()) + .add(group); if (!reply.isSuccess()) { - return failed(new RatisRequestFailedException(reply.getException())); + throw new RatisRequestFailedException(reply.getException()); } + } catch (AlreadyExistsException e) { + throw new ConsensusGroupAlreadyExistException(groupId); } catch (Exception e) { - return failed(new RatisRequestFailedException(e)); + throw new RatisRequestFailedException(e); } - return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } /** @@ -419,7 +432,7 @@ class RatisConsensus implements IConsensus { * clean up */ @Override - public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) { + public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException { RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); // send remove group to myself @@ -435,13 +448,13 @@ class RatisConsensus implements IConsensus { true, false)); if (!reply.isSuccess()) { - return failed(new RatisRequestFailedException(reply.getException())); + throw new RatisRequestFailedException(reply.getException()); } + } catch (GroupMismatchException e) { + throw new ConsensusGroupNotExistException(groupId); } catch (IOException e) { - return failed(new RatisRequestFailedException(e)); + throw new RatisRequestFailedException(e); } - - return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } /** @@ -451,32 +464,25 @@ class RatisConsensus implements IConsensus { * change */ @Override - public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) { + public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException { RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); - RaftGroup group = getGroupInfo(raftGroupId); - RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY); + RaftGroup group = getGroupInfo(raftGroupId); // pre-conditions: group exists and myself in this group if (group == null || !group.getPeers().contains(myself)) { - return failed(new ConsensusGroupNotExistException(groupId)); + throw new ConsensusGroupNotExistException(groupId); } + RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY); // pre-condition: peer not in this group if (group.getPeers().contains(peerToAdd)) { - return failed(new PeerAlreadyInConsensusGroupException(groupId, peer)); + throw new PeerAlreadyInConsensusGroupException(groupId, peer); } List<RaftPeer> newConfig = new ArrayList<>(group.getPeers()); newConfig.add(peerToAdd); - RaftClientReply reply; - try { - reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig)); - } catch (RatisRequestFailedException e) { - return failed(e); - } - - return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); + sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig)); } /** @@ -486,7 +492,7 @@ class RatisConsensus implements IConsensus { * change */ @Override - public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) { + public ConsensusGenericResponse removeRemotePeer(ConsensusGroupId groupId, Peer peer) { RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); RaftGroup group = getGroupInfo(raftGroupId); RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY); @@ -516,53 +522,26 @@ class RatisConsensus implements IConsensus { return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } - @Override - public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); - } - - @Override - public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) { - RaftGroup raftGroup = buildRaftGroup(groupId, newPeers); - - // pre-conditions: myself in this group - if (!raftGroup.getPeers().contains(myself)) { - return failed(new ConsensusGroupNotExistException(groupId)); - } - - // add RaftPeer myself to this RaftGroup - RaftClientReply reply; - try { - reply = sendReconfiguration(raftGroup); - } catch (RatisRequestFailedException e) { - return failed(e); - } - return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); - } /** * NOTICE: transferLeader *does not guarantee* the leader be transferred to newLeader. * transferLeader is implemented by 1. modify peer priority 2. ask current leader to step down * - * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and degrade all follower peers - * to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does not allow a peer with - * priority <= currentLeader.priority to becomes the leader, so we have to upgrade leader's - * priority to 1 + * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and degrade all follower + * peers to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does not allow a + * peer with priority <= currentLeader.priority to becomes the leader, so we have to upgrade + * leader's priority to 1 * * <p>2. call transferLeadership to force current leader to step down and raise a new round of * election. In this election, the newLeader peer with priority 1 is guaranteed to be elected. */ @Override - public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) { + public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException { // first fetch the newest information - RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); - RaftGroup raftGroup = getGroupInfo(raftGroupId); - - if (raftGroup == null) { - return failed(new ConsensusGroupNotExistException(groupId)); - } + RaftGroup raftGroup = Optional.ofNullable(getGroupInfo(raftGroupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY); @@ -581,21 +560,20 @@ class RatisConsensus implements IConsensus { } RaftClientReply reply; - try (RatisClient client = getRaftClient(raftGroup)) { + try (org.apache.iotdb.consensus.ratis.RatisClient client = getRaftClient(raftGroup)) { RaftClientReply configChangeReply = client.getRaftClient().admin().setConfiguration(newConfiguration); if (!configChangeReply.isSuccess()) { - return failed(new RatisRequestFailedException(configChangeReply.getException())); + throw new RatisRequestFailedException(configChangeReply.getException()); } reply = transferLeader(raftGroup, newRaftLeader); if (!reply.isSuccess()) { - return failed(new RatisRequestFailedException(reply.getException())); + throw new RatisRequestFailedException(reply.getException()); } } catch (Exception e) { - return failed(new RatisRequestFailedException(e)); + throw new RatisRequestFailedException(e); } - return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } private void forceStepDownLeader(RaftGroup group) throws Exception { @@ -605,8 +583,7 @@ class RatisConsensus implements IConsensus { } private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws Exception { - try (RatisClient client = getRaftClient(group)) { - // TODO tuning for timeoutMs + try (org.apache.iotdb.consensus.ratis.RatisClient client = getRaftClient(group)) { return client .getRaftClient() .admin() @@ -650,7 +627,7 @@ class RatisConsensus implements IConsensus { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.warn("Unexpected interruption", e); + logger.warn("Unexpected interruption when waitUntilLeaderReady", e); return false; } return divisionInfo.isLeader(); @@ -690,12 +667,11 @@ class RatisConsensus implements IConsensus { } @Override - public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) { + public void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException { RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); RaftGroup groupInfo = getGroupInfo(raftGroupId); - if (groupInfo == null || !groupInfo.getPeers().contains(myself)) { - return failed(new ConsensusGroupNotExistException(groupId)); + throw new ConsensusGroupNotExistException(groupId); } // TODO tuning snapshot create timeout @@ -707,13 +683,11 @@ class RatisConsensus implements IConsensus { try { reply = server.snapshotManagement(request); if (!reply.isSuccess()) { - return failed(new RatisRequestFailedException(reply.getException())); + throw new RatisRequestFailedException(reply.getException()); } } catch (IOException ioException) { - return failed(new RatisRequestFailedException(ioException)); + throw new RatisRequestFailedException(ioException); } - - return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } private void triggerSnapshotByCustomize() { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java index fe96ddfa918..5544bad9a53 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java @@ -25,15 +25,16 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.IStateMachine.Registry; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; @@ -51,6 +52,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,7 +69,8 @@ class SimpleConsensus implements IConsensus { private final int thisNodeId; private final File storageDir; private final IStateMachine.Registry registry; - private final Map<ConsensusGroupId, SimpleServerImpl> stateMachineMap = new ConcurrentHashMap<>(); + private final Map<ConsensusGroupId, SimpleConsensusServerImpl> stateMachineMap = + new ConcurrentHashMap<>(); private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); @@ -79,14 +82,14 @@ class SimpleConsensus implements IConsensus { } @Override - public void start() throws IOException { + public synchronized void start() throws IOException { initAndRecover(); } private void initAndRecover() throws IOException { if (!storageDir.exists()) { if (!storageDir.mkdirs()) { - logger.warn("Unable to create consensus dir at {}", storageDir); + throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); } } else { try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) { @@ -95,8 +98,8 @@ class SimpleConsensus implements IConsensus { ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.create( Integer.parseInt(items[0]), Integer.parseInt(items[1])); - SimpleServerImpl consensus = - new SimpleServerImpl( + SimpleConsensusServerImpl consensus = + new SimpleConsensusServerImpl( new Peer(consensusGroupId, thisNodeId, thisNode), registry.apply(consensusGroupId)); stateMachineMap.put(consensusGroupId, consensus); @@ -107,24 +110,20 @@ class SimpleConsensus implements IConsensus { } @Override - public void stop() throws IOException { - stateMachineMap.values().parallelStream().forEach(SimpleServerImpl::stop); + public synchronized void stop() throws IOException { + stateMachineMap.values().parallelStream().forEach(SimpleConsensusServerImpl::stop); } @Override - public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) { - SimpleServerImpl impl = stateMachineMap.get(groupId); - if (impl == null) { - return ConsensusWriteResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); - } - - TSStatus status; + public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) + throws ConsensusException { + SimpleConsensusServerImpl impl = + Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); if (impl.isReadOnly()) { - status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); - status.setMessage("Fail to do non-query operations because system is read-only."); + return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY); } else { + TSStatus status; if (groupId instanceof DataRegionId) { long startWriteTime = System.nanoTime(); status = impl.write(request); @@ -133,58 +132,59 @@ class SimpleConsensus implements IConsensus { } else { status = impl.write(request); } + return status; } - return ConsensusWriteResponse.newBuilder().setStatus(status).build(); } @Override - public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) { - SimpleServerImpl impl = stateMachineMap.get(groupId); - if (impl == null) { - return ConsensusReadResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); - } - return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build(); + public DataSet read(ConsensusGroupId groupId, IConsensusRequest request) + throws ConsensusException { + return Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)) + .read(request); } + @SuppressWarnings("java:S2201") @Override - public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) { + public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) + throws ConsensusException { int consensusGroupSize = peers.size(); if (consensusGroupSize != 1) { - return ConsensusGenericResponse.newBuilder() - .setException(new IllegalPeerNumException(consensusGroupSize)) - .build(); + throw new IllegalPeerNumException(consensusGroupSize); } if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) { - return ConsensusGenericResponse.newBuilder() - .setException(new IllegalPeerEndpointException(thisNode, peers)) - .build(); + throw new IllegalPeerEndpointException(thisNode, peers); } AtomicBoolean exist = new AtomicBoolean(true); - stateMachineMap.computeIfAbsent( - groupId, - k -> { - exist.set(false); - SimpleServerImpl impl = new SimpleServerImpl(peers.get(0), registry.apply(groupId)); - impl.start(); - String path = buildPeerDir(groupId); - File file = new File(path); - if (!file.mkdirs()) { - logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); - } - return impl; - }); + Optional.ofNullable( + stateMachineMap.computeIfAbsent( + groupId, + k -> { + exist.set(false); + + String path = buildPeerDir(groupId); + File file = new File(path); + if (!file.mkdirs()) { + logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); + return null; + } + + SimpleConsensusServerImpl impl = + new SimpleConsensusServerImpl(peers.get(0), registry.apply(groupId)); + impl.start(); + return impl; + })) + .orElseThrow( + () -> + new ConsensusException( + String.format("Unable to create consensus dir for group %s", groupId))); if (exist.get()) { - return ConsensusGenericResponse.newBuilder() - .setException(new ConsensusGroupAlreadyExistException(groupId)) - .build(); + throw new ConsensusGroupAlreadyExistException(groupId); } - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } @Override - public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) { + public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException { AtomicBoolean exist = new AtomicBoolean(false); stateMachineMap.computeIfPresent( groupId, @@ -194,43 +194,29 @@ class SimpleConsensus implements IConsensus { FileUtils.deleteDirectory(new File(buildPeerDir(groupId))); return null; }); - if (!exist.get()) { - return ConsensusGenericResponse.newBuilder() - .setException(new ConsensusGroupNotExistException(groupId)) - .build(); + throw new ConsensusGroupNotExistException(groupId); } - return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); - } - - @Override - public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); - } - - @Override - public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); } @Override - public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); + public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException { + throw new ConsensusException("SimpleConsensus does not support membership changes"); } @Override - public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); + public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException { + throw new ConsensusException("SimpleConsensus does not support membership changes"); } @Override - public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); + public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException { + throw new ConsensusException("SimpleConsensus does not support leader transfer"); } @Override - public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) { - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); + public void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException { + throw new ConsensusException("SimpleConsensus does not support snapshot trigger currently"); } @Override diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java similarity index 94% rename from iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java rename to iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java index 3f2b68857cb..b4d8fe87040 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java @@ -27,12 +27,12 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest; import java.io.File; -public class SimpleServerImpl implements IStateMachine { +public class SimpleConsensusServerImpl implements IStateMachine { private final Peer peer; private final IStateMachine stateMachine; - public SimpleServerImpl(Peer peer, IStateMachine stateMachine) { + public SimpleConsensusServerImpl(Peer peer, IStateMachine stateMachine) { this.peer = peer; this.stateMachine = stateMachine; } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index 916f91f3ac5..8b95ae0d4d0 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -159,9 +159,9 @@ public class ReplicateTest { @Test public void replicateUsingQueueTest() throws IOException, InterruptedException { logger.info("Start ReplicateUsingQueueTest"); - servers.get(0).createPeer(group.getGroupId(), group.getPeers()); - servers.get(1).createPeer(group.getGroupId(), group.getPeers()); - servers.get(2).createPeer(group.getGroupId(), group.getPeers()); + servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers()); Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex()); Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex()); @@ -237,8 +237,8 @@ public class ReplicateTest { @Test public void replicateUsingWALTest() throws IOException, InterruptedException { logger.info("Start ReplicateUsingWALTest"); - servers.get(0).createPeer(group.getGroupId(), group.getPeers()); - servers.get(1).createPeer(group.getGroupId(), group.getPeers()); + servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex()); Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex()); @@ -256,7 +256,7 @@ public class ReplicateTest { stopServer(); initServer(); - servers.get(2).createPeer(group.getGroupId(), group.getPeers()); + servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers()); Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration()); Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration()); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 5afee66ee1a..c4333c860ea 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -90,11 +90,11 @@ public class StabilityTest { } public void peerTest() throws Exception { - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); - consensusImpl.deletePeer(dataRegionId); + consensusImpl.deleteLocalPeer(dataRegionId); consensusImpl.stop(); consensusImpl = null; @@ -102,16 +102,16 @@ public class StabilityTest { constructConsensus(); ConsensusGenericResponse response = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList( new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); Assert.assertTrue(response.isSuccess()); - consensusImpl.deletePeer(dataRegionId); + consensusImpl.deleteLocalPeer(dataRegionId); } public void snapshotTest() throws IOException { - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); consensusImpl.triggerSnapshot(dataRegionId); @@ -132,11 +132,11 @@ public class StabilityTest { Assert.assertNotNull(versionFiles2); Assert.assertEquals(1, versionFiles2.length); Assert.assertNotEquals(versionFiles1[0].getName(), versionFiles2[0].getName()); - consensusImpl.deletePeer(dataRegionId); + consensusImpl.deleteLocalPeer(dataRegionId); } public void snapshotUpgradeTest() throws Exception { - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); consensusImpl.triggerSnapshot(dataRegionId); @@ -166,6 +166,6 @@ public class StabilityTest { Assert.assertEquals( oldSnapshotIndex + 1, Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", ""))); - consensusImpl.deletePeer(dataRegionId); + consensusImpl.deleteLocalPeer(dataRegionId); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index 245474c10e5..d9cd902538a 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -101,9 +101,9 @@ public class RatisConsensusTest { @Test public void basicConsensus3Copy() throws Exception { - servers.get(0).createPeer(group.getGroupId(), group.getPeers()); - servers.get(1).createPeer(group.getGroupId(), group.getPeers()); - servers.get(2).createPeer(group.getGroupId(), group.getPeers()); + servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers()); doConsensus(servers.get(0), group.getGroupId(), 10, 10); } @@ -112,19 +112,19 @@ public class RatisConsensusTest { public void addMemberToGroup() throws Exception { List<Peer> original = peers.subList(0, 1); - servers.get(0).createPeer(group.getGroupId(), original); + servers.get(0).createLocalPeer(group.getGroupId(), original); doConsensus(servers.get(0), group.getGroupId(), 10, 10); - ConsensusGenericResponse resp = servers.get(0).createPeer(group.getGroupId(), original); + ConsensusGenericResponse resp = servers.get(0).createLocalPeer(group.getGroupId(), original); Assert.assertFalse(resp.isSuccess()); Assert.assertTrue(resp.getException() instanceof RatisRequestFailedException); // add 2 members - servers.get(1).createPeer(group.getGroupId(), Collections.emptyList()); - servers.get(0).addPeer(group.getGroupId(), peers.get(1)); + servers.get(1).createLocalPeer(group.getGroupId(), Collections.emptyList()); + servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1)); - servers.get(2).createPeer(group.getGroupId(), Collections.emptyList()); - servers.get(0).changePeer(group.getGroupId(), peers); + servers.get(2).createLocalPeer(group.getGroupId(), Collections.emptyList()); + servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2)); Assert.assertEquals( 3, ((TestUtils.IntegerCounter) stateMachines.get(0)).getConfiguration().size()); @@ -133,39 +133,39 @@ public class RatisConsensusTest { @Test public void removeMemberFromGroup() throws Exception { - servers.get(0).createPeer(group.getGroupId(), group.getPeers()); - servers.get(1).createPeer(group.getGroupId(), group.getPeers()); - servers.get(2).createPeer(group.getGroupId(), group.getPeers()); + servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers()); doConsensus(servers.get(0), group.getGroupId(), 10, 10); servers.get(0).transferLeader(gid, peers.get(0)); - servers.get(0).removePeer(gid, peers.get(1)); - servers.get(1).deletePeer(gid); - servers.get(0).removePeer(gid, peers.get(2)); - servers.get(2).deletePeer(gid); + servers.get(0).removeRemotePeer(gid, peers.get(1)); + servers.get(1).deleteLocalPeer(gid); + servers.get(0).removeRemotePeer(gid, peers.get(2)); + servers.get(2).deleteLocalPeer(gid); doConsensus(servers.get(0), group.getGroupId(), 10, 20); } @Test public void oneMemberGroupChange() throws Exception { - servers.get(0).createPeer(group.getGroupId(), peers.subList(0, 1)); + servers.get(0).createLocalPeer(group.getGroupId(), peers.subList(0, 1)); doConsensus(servers.get(0), group.getGroupId(), 10, 10); - servers.get(1).createPeer(group.getGroupId(), Collections.emptyList()); - servers.get(0).addPeer(group.getGroupId(), peers.get(1)); + servers.get(1).createLocalPeer(group.getGroupId(), Collections.emptyList()); + servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1)); servers.get(1).transferLeader(group.getGroupId(), peers.get(1)); - servers.get(0).removePeer(group.getGroupId(), peers.get(0)); + servers.get(0).removeRemotePeer(group.getGroupId(), peers.get(0)); Assert.assertEquals(servers.get(1).getLeader(gid).getNodeId(), peers.get(1).getNodeId()); - servers.get(0).deletePeer(group.getGroupId()); + servers.get(0).deleteLocalPeer(group.getGroupId()); } @Test public void crashAndStart() throws Exception { - servers.get(0).createPeer(group.getGroupId(), group.getPeers()); - servers.get(1).createPeer(group.getGroupId(), group.getPeers()); - servers.get(2).createPeer(group.getGroupId(), group.getPeers()); + servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers()); // 200 operation will trigger snapshot & purge doConsensus(servers.get(0), group.getGroupId(), 200, 200); @@ -178,9 +178,9 @@ public class RatisConsensusTest { // FIXME: Turn on the test when it is stable public void transferLeader() throws Exception { - servers.get(0).createPeer(group.getGroupId(), group.getPeers()); - servers.get(1).createPeer(group.getGroupId(), group.getPeers()); - servers.get(2).createPeer(group.getGroupId(), group.getPeers()); + servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); + servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers()); doConsensus(servers.get(0), group.getGroupId(), 10, 10); @@ -198,13 +198,13 @@ public class RatisConsensusTest { @Test public void transferSnapshot() throws Exception { - servers.get(0).createPeer(gid, peers.subList(0, 1)); + servers.get(0).createLocalPeer(gid, peers.subList(0, 1)); doConsensus(servers.get(0), gid, 10, 10); Assert.assertTrue(servers.get(0).triggerSnapshot(gid).isSuccess()); - servers.get(1).createPeer(gid, Collections.emptyList()); - servers.get(0).addPeer(gid, peers.get(1)); + servers.get(1).createLocalPeer(gid, Collections.emptyList()); + servers.get(0).addRemotePeer(gid, peers.get(1)); doConsensus(servers.get(1), gid, 10, 20); } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java index 001bb95de0a..c90d5d9952d 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java @@ -123,7 +123,7 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createPeer(gid, members)); + miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); // first write 10 ops TestUtils.write(miniCluster.getServer(0), gid, 10); @@ -150,7 +150,7 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createPeer(gid, members)); + miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); // first write 10 ops TestUtils.write(miniCluster.getServer(0), gid, 10); @@ -192,7 +192,7 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createPeer(gid, members)); + miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); // first write 10 ops TestUtils.write(miniCluster.getServer(0), gid, 10); @@ -218,7 +218,7 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createPeer(gid, members)); + miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); // first write 30 ops TestUtils.write(miniCluster.getServer(0), gid, 50); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java index b70499eb882..5fe3928925e 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java @@ -78,7 +78,7 @@ public class RecoveryTest { @Test public void recoveryTest() throws Exception { - consensusImpl.createPeer( + consensusImpl.createLocalPeer( schemaRegionId, Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 9000)))); @@ -88,7 +88,7 @@ public class RecoveryTest { constructConsensus(); ConsensusGenericResponse response = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( schemaRegionId, Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 9000)))); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index 6c065f76a4c..5cd64edf167 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -167,21 +167,21 @@ public class SimpleConsensusTest { @Test public void addConsensusGroup() { ConsensusGenericResponse response1 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); assertTrue(response1.isSuccess()); assertNull(response1.getException()); ConsensusGenericResponse response2 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); assertFalse(response2.isSuccess()); assertTrue(response2.getException() instanceof ConsensusGroupAlreadyExistException); ConsensusGenericResponse response3 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Arrays.asList( new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)), @@ -190,14 +190,14 @@ public class SimpleConsensusTest { assertTrue(response3.getException() instanceof IllegalPeerNumException); ConsensusGenericResponse response4 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.1", 6667)))); assertFalse(response4.isSuccess()); assertTrue(response4.getException() instanceof IllegalPeerEndpointException); ConsensusGenericResponse response5 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( schemaRegionId, Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); assertTrue(response5.isSuccess()); @@ -206,18 +206,18 @@ public class SimpleConsensusTest { @Test public void removeConsensusGroup() { - ConsensusGenericResponse response1 = consensusImpl.deletePeer(dataRegionId); + ConsensusGenericResponse response1 = consensusImpl.deleteLocalPeer(dataRegionId); assertFalse(response1.isSuccess()); assertTrue(response1.getException() instanceof ConsensusGroupNotExistException); ConsensusGenericResponse response2 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); assertTrue(response2.isSuccess()); assertNull(response2.getException()); - ConsensusGenericResponse response3 = consensusImpl.deletePeer(dataRegionId); + ConsensusGenericResponse response3 = consensusImpl.deleteLocalPeer(dataRegionId); assertTrue(response3.isSuccess()); assertNull(response3.getException()); } @@ -225,7 +225,7 @@ public class SimpleConsensusTest { @Test public void addPeer() { ConsensusGenericResponse response = - consensusImpl.addPeer( + consensusImpl.addRemotePeer( dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667))); assertFalse(response.isSuccess()); } @@ -233,20 +233,11 @@ public class SimpleConsensusTest { @Test public void removePeer() { ConsensusGenericResponse response = - consensusImpl.removePeer( + consensusImpl.removeRemotePeer( dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667))); assertFalse(response.isSuccess()); } - @Test - public void changePeer() { - ConsensusGenericResponse response = - consensusImpl.changePeer( - dataRegionId, - Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertFalse(response.isSuccess()); - } - @Test public void transferLeader() { ConsensusGenericResponse response = @@ -264,21 +255,21 @@ public class SimpleConsensusTest { @Test public void write() { ConsensusGenericResponse response1 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); assertTrue(response1.isSuccess()); assertNull(response1.getException()); ConsensusGenericResponse response2 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( schemaRegionId, Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); assertTrue(response2.isSuccess()); assertNull(response2.getException()); ConsensusGenericResponse response3 = - consensusImpl.createPeer( + consensusImpl.createLocalPeer( configId, Collections.singletonList(new Peer(configId, 1, new TEndPoint("0.0.0.0", 6667)))); assertTrue(response3.isSuccess()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 33c6cfcee4d..ff606c63a32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1415,7 +1415,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId); if (consensusGroupId instanceof DataRegionId) { ConsensusGenericResponse response = - DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId); + DataRegionConsensusImpl.getInstance().deleteLocalPeer(consensusGroupId); if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) { return RpcUtils.getStatus( @@ -1424,7 +1424,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface return regionManager.deleteDataRegion((DataRegionId) consensusGroupId); } else { ConsensusGenericResponse response = - SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId); + SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(consensusGroupId); if (!response.isSuccess() && !(response.getException() instanceof PeerNotInConsensusGroupException)) { return RpcUtils.getStatus( @@ -1755,9 +1755,9 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); ConsensusGenericResponse resp; if (regionId instanceof DataRegionId) { - resp = DataRegionConsensusImpl.getInstance().createPeer(regionId, peers); + resp = DataRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers); } else { - resp = SchemaRegionConsensusImpl.getInstance().createPeer(regionId, peers); + resp = SchemaRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers); } if (!resp.isSuccess()) { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java index 55c4e3bb87d..fdff724a3dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java @@ -119,7 +119,7 @@ public class DataNodeRegionManager { peers.add(new Peer(schemaRegionId, dataNodeLocation.getDataNodeId(), endpoint)); } ConsensusGenericResponse consensusGenericResponse = - SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId, peers); + SchemaRegionConsensusImpl.getInstance().createLocalPeer(schemaRegionId, peers); if (consensusGenericResponse.isSuccess()) { tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } else if (consensusGenericResponse.getException() @@ -160,7 +160,7 @@ public class DataNodeRegionManager { peers.add(new Peer(dataRegionId, dataNodeLocation.getDataNodeId(), endpoint)); } ConsensusGenericResponse consensusGenericResponse = - DataRegionConsensusImpl.getInstance().createPeer(dataRegionId, peers); + DataRegionConsensusImpl.getInstance().createLocalPeer(dataRegionId, peers); if (consensusGenericResponse.isSuccess()) { tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } else if (consensusGenericResponse.getException() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index 1b94e607282..05ef52758e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; @@ -163,28 +164,27 @@ public class RegionWriteExecutor { return response; } - ConsensusWriteResponse writeResponse = - executePlanNodeInConsensusLayer(context.getRegionId(), node); - if (writeResponse.getStatus() != null) { + try { + TSStatus status = executePlanNodeInConsensusLayer(context.getRegionId(), node); response.setAccepted( - TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode()); - response.setMessage(writeResponse.getStatus().message); - response.setStatus(writeResponse.getStatus()); - } else { + TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); + response.setMessage(status.getMessage()); + response.setStatus(status); + } catch (ConsensusException e) { LOGGER.error( "Something wrong happened while calling consensus layer's write API.", - writeResponse.getException()); + e); response.setAccepted(false); - response.setMessage(writeResponse.getException().toString()); + response.setMessage(e.toString()); response.setStatus( RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, writeResponse.getErrorMessage())); + TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage())); } return response; } - private ConsensusWriteResponse executePlanNodeInConsensusLayer( - ConsensusGroupId groupId, PlanNode planNode) { + private TSStatus executePlanNodeInConsensusLayer( + ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException { if (groupId instanceof DataRegionId) { return dataRegionConsensus.write(groupId, planNode); } else { @@ -227,72 +227,60 @@ public class RegionWriteExecutor { RegionExecutionResult response = new RegionExecutionResult(); context.getRegionWriteValidationRWLock().readLock().lock(); try { - - ConsensusWriteResponse writeResponse = + TSStatus status = fireTriggerAndInsert(context.getRegionId(), insertNode); - - if (writeResponse.getStatus() != null) { - response.setAccepted( - TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode()); - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != writeResponse.getStatus().getCode()) { - response.setMessage(writeResponse.getStatus().message); - response.setStatus(writeResponse.getStatus()); - } else { - response.setMessage(writeResponse.getStatus().message); - } - } else { - LOGGER.warn( - "Something wrong happened while calling consensus layer's write API.", - writeResponse.getException()); - response.setAccepted(false); - response.setMessage(writeResponse.getException().toString()); - response.setStatus( - RpcUtils.getStatus( - TSStatusCode.WRITE_PROCESS_ERROR, writeResponse.getException().toString())); + response.setAccepted( + TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); + response.setMessage(status.message); + if (!response.isAccepted()) { + response.setStatus(status); } - + return response; + } catch (ConsensusException e) { + LOGGER.warn( + "Something wrong happened while calling consensus layer's write API.", + e); + response.setAccepted(false); + response.setMessage(e.toString()); + response.setStatus( + RpcUtils.getStatus( + TSStatusCode.WRITE_PROCESS_ERROR, e.toString())); return response; } finally { context.getRegionWriteValidationRWLock().readLock().unlock(); } } - private ConsensusWriteResponse fireTriggerAndInsert( - ConsensusGroupId groupId, PlanNode planNode) { + private TSStatus fireTriggerAndInsert( + ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException { long triggerCostTime = 0; - ConsensusWriteResponse writeResponse; + TSStatus status; long startTime = System.nanoTime(); // fire Trigger before the insertion TriggerFireResult result = triggerFireVisitor.process(planNode, TriggerEvent.BEFORE_INSERT); triggerCostTime += (System.nanoTime() - startTime); if (result.equals(TriggerFireResult.TERMINATION)) { - TSStatus triggerError = new TSStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode()); - triggerError.setMessage( + status = RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), "Failed to complete the insertion because trigger error before the insertion."); - writeResponse = ConsensusWriteResponse.newBuilder().setStatus(triggerError).build(); } else { boolean hasFailedTriggerBeforeInsertion = result.equals(TriggerFireResult.FAILED_NO_TERMINATION); long startWriteTime = System.nanoTime(); - writeResponse = dataRegionConsensus.write(groupId, planNode); + status = dataRegionConsensus.write(groupId, planNode); PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() - startWriteTime); // fire Trigger after the insertion - if (writeResponse.isSuccessful()) { - startTime = System.nanoTime(); - result = triggerFireVisitor.process(planNode, TriggerEvent.AFTER_INSERT); - if (hasFailedTriggerBeforeInsertion || !result.equals(TriggerFireResult.SUCCESS)) { - TSStatus triggerError = new TSStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode()); - triggerError.setMessage( - "Meet trigger error before/after the insertion, the insertion itself is completed."); - writeResponse = ConsensusWriteResponse.newBuilder().setStatus(triggerError).build(); - } - triggerCostTime += (System.nanoTime() - startTime); + startTime = System.nanoTime(); + result = triggerFireVisitor.process(planNode, TriggerEvent.AFTER_INSERT); + if (hasFailedTriggerBeforeInsertion || !result.equals(TriggerFireResult.SUCCESS)) { + status = RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), + "Meet trigger error before/after the insertion, the insertion itself is completed."); } + triggerCostTime += (System.nanoTime() - startTime); } PERFORMANCE_OVERVIEW_METRICS.recordScheduleTriggerCost(triggerCostTime); - return writeResponse; + return status; } @Override @@ -804,7 +792,7 @@ public class RegionWriteExecutor { String.format( "Template is being unsetting from prefix path of %s. Please try activating later.", new PartialPath( - Arrays.copyOf(entry.getKey().getNodes(), entry.getValue().right + 1)) + Arrays.copyOf(entry.getKey().getNodes(), entry.getValue().right + 1)) .getFullPath()); result.setMessage(message); result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index 35b495c8c1b..b1829b810f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -265,9 +265,9 @@ public class RegionMigrateService implements IService { private ConsensusGenericResponse addRegionPeer(ConsensusGroupId regionId, Peer newPeer) { ConsensusGenericResponse resp; if (regionId instanceof DataRegionId) { - resp = DataRegionConsensusImpl.getInstance().addPeer(regionId, newPeer); + resp = DataRegionConsensusImpl.getInstance().addRemotePeer(regionId, newPeer); } else { - resp = SchemaRegionConsensusImpl.getInstance().addPeer(regionId, newPeer); + resp = SchemaRegionConsensusImpl.getInstance().addRemotePeer(regionId, newPeer); } return resp; } @@ -354,9 +354,9 @@ public class RegionMigrateService implements IService { private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) { ConsensusGenericResponse resp; if (regionId instanceof DataRegionId) { - resp = DataRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer); + resp = DataRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer); } else { - resp = SchemaRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer); + resp = SchemaRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer); } return resp; } @@ -409,9 +409,9 @@ public class RegionMigrateService implements IService { ConsensusGenericResponse resp; try { if (regionId instanceof DataRegionId) { - resp = DataRegionConsensusImpl.getInstance().deletePeer(regionId); + resp = DataRegionConsensusImpl.getInstance().deleteLocalPeer(regionId); } else { - resp = SchemaRegionConsensusImpl.getInstance().deletePeer(regionId); + resp = SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(regionId); } } catch (Throwable e) { taskLogger.error("{}, deletePeer error, regionId: {}", REGION_MIGRATE_PROCESS, regionId, e); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java index 8d5a9fff2c8..bc319d63525 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java @@ -90,7 +90,7 @@ public class DataNodeInternalRPCServiceImplTest { public void setUp() throws Exception { TRegionReplicaSet regionReplicaSet = genRegionReplicaSet(); SchemaRegionConsensusImpl.getInstance() - .createPeer( + .createLocalPeer( ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()), genSchemaRegionPeerList(regionReplicaSet)); dataNodeInternalRPCServiceImpl = new DataNodeInternalRPCServiceImpl(); @@ -100,7 +100,7 @@ public class DataNodeInternalRPCServiceImplTest { public void tearDown() throws Exception { TRegionReplicaSet regionReplicaSet = genRegionReplicaSet(); SchemaRegionConsensusImpl.getInstance() - .deletePeer( + .deleteLocalPeer( ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId())); FileUtils.deleteFully(new File(conf.getConsensusDir())); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index 6cb401b56fa..83f041b0f6e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -139,7 +139,7 @@ public class StatusUtils { status.setMessage("Meet error in close operation."); break; case SYSTEM_READ_ONLY: - status.setMessage("Database is read-only."); + status.setMessage("Fail to do non-query operations because system is read-only."); break; case DISK_SPACE_INSUFFICIENT: status.setMessage("Disk space is insufficient.");
