This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch issue_3076 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 870949223ba80c5f92d10798866f4edb84176c08 Author: LebronAl <[email protected]> AuthorDate: Mon May 2 16:57:13 2022 +0800 Optimize StandAloneConsensus read/write performance && consensus module code refactor --- .../statemachine/PartitionRegionStateMachine.java | 2 +- .../apache/iotdb/consensus/ConsensusFactory.java | 1 - .../{statemachine => }/IStateMachine.java | 5 +- .../exception/IllegalPeerEndpointException.java | 32 ++++++ .../ratis/ApplicationStateMachineProxy.java | 2 +- .../iotdb/consensus/ratis/RatisConsensus.java | 2 +- .../iotdb/consensus/ratis/RequestMessage.java | 4 - .../iotdb/consensus/ratis/SnapshotStorage.java | 4 +- .../org/apache/iotdb/consensus/ratis/Utils.java | 3 +- .../consensus/standalone/StandAloneConsensus.java | 116 ++++++++++----------- .../consensus/standalone/StandAloneServerImpl.java | 14 ++- .../apache/iotdb/consensus}/EmptyStateMachine.java | 2 +- .../apache/iotdb/consensus/ratis/TestUtils.java | 2 +- .../iotdb/consensus/standalone/RecoveryTest.java | 4 +- .../standalone/StandAloneConsensusTest.java | 18 +++- .../iotdb/commons/consensus/ConsensusGroupId.java | 69 +++++++----- .../iotdb/commons/consensus/DataRegionId.java | 39 +------ .../iotdb/commons/consensus/PartitionRegionId.java | 39 +------ .../iotdb/commons/consensus/SchemaRegionId.java | 39 +------ .../apache/iotdb/commons/ConsensusGroupIdTest.java | 4 +- .../consensus/statemachine/BaseStateMachine.java | 2 +- .../service/thrift/impl/InternalServiceImpl.java | 2 +- .../iotdb/db/service/InternalServiceImplTest.java | 4 +- 23 files changed, 172 insertions(+), 237 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java index fa14e859f0..e429f61f20 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java @@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.consensus.request.ConfigRequest; import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException; import org.apache.iotdb.confignode.service.executor.ConfigRequestExecutor; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.SnapshotMeta; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java index d9364ad13e..efc40e31e6 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java @@ -20,7 +20,6 @@ package org.apache.iotdb.consensus; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java similarity index 97% rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java rename to consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java index aab3e17bcd..c090fa2a35 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.consensus.statemachine; +package org.apache.iotdb.consensus; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -25,10 +25,13 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.SnapshotMeta; import org.apache.iotdb.consensus.common.request.IConsensusRequest; +import javax.annotation.concurrent.ThreadSafe; + import java.io.File; import java.nio.ByteBuffer; import java.util.function.Function; +@ThreadSafe public interface IStateMachine { interface Registry extends Function<ConsensusGroupId, IStateMachine> {} diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java new file mode 100644 index 0000000000..7591345acc --- /dev/null +++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.exception; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; + +public class IllegalPeerEndpointException extends ConsensusException { + + public IllegalPeerEndpointException(TEndPoint currentNode, TEndPoint newNode) { + super( + String.format( + "Illegal creation for node %s in node %s in StandAloneConsensus Mode", + newNode, currentNode)); + } +} diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index 53113293a0..317029a740 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -19,11 +19,11 @@ package org.apache.iotdb.consensus.ratis; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.SnapshotMeta; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.Message; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index d022f096cc..f8d65fa0b0 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.IClientPoolFactory; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.IConsensus; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; @@ -39,7 +40,6 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; import org.apache.iotdb.consensus.exception.RatisRequestFailedException; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java index 9dc73d815c..242573e5be 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java @@ -23,15 +23,11 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.ratis.protocol.Message; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; public class RequestMessage implements Message { - private final Logger logger = LoggerFactory.getLogger(RequestMessage.class); - private final IConsensusRequest actualRequest; private volatile ByteString serializedContent; private static final int DEFAULT_BUFFER_SIZE = 2048 * 10; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java index 2fbc7916ca..c50a4b3446 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java @@ -18,8 +18,8 @@ */ package org.apache.iotdb.consensus.ratis; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.SnapshotMeta; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.apache.ratis.io.MD5Hash; import org.apache.ratis.server.protocol.TermIndex; @@ -46,7 +46,7 @@ import java.util.List; * hopefully will be introduced in Ratis 2.3.0. */ public class SnapshotStorage implements StateMachineStorage { - private IStateMachine applicationStateMachine; + private final IStateMachine applicationStateMachine; private File stateMachineDir; private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java index f1842a6dea..96674e5592 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java @@ -166,8 +166,7 @@ public class Utils { public static ByteBuffer getMetadataFromTermIndex(TermIndex termIndex) { String ordinal = String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex()); - ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes()); - return metadata; + return ByteBuffer.wrap(ordinal.getBytes()); } public static TermIndex getTermIndexFromMetadata(ByteBuffer metadata) { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java index f0d15221f2..8d79c1b3fb 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java @@ -21,10 +21,10 @@ package org.apache.iotdb.consensus.standalone; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.IConsensus; -import org.apache.iotdb.consensus.common.DataSet; +import org.apache.iotdb.consensus.IStateMachine; +import org.apache.iotdb.consensus.IStateMachine.Registry; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; @@ -32,9 +32,11 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; +import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; import org.apache.iotdb.consensus.exception.IllegalPeerNumException; -import org.apache.iotdb.consensus.statemachine.IStateMachine; -import org.apache.iotdb.consensus.statemachine.IStateMachine.Registry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -43,20 +45,19 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * A simple consensus implementation, which can be used when replicaNum is 1. * * <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart - * - * <p>any module can use `IConsensus consensusImpl = new StandAloneConsensus(id -> new - * EmptyStateMachine());` to perform an initialization implementation. */ class StandAloneConsensus implements IConsensus { + private final Logger logger = LoggerFactory.getLogger(StandAloneConsensus.class); + private final TEndPoint thisNode; private final File storageDir; private final IStateMachine.Registry registry; @@ -71,16 +72,21 @@ class StandAloneConsensus implements IConsensus { @Override public void start() throws IOException { - if (!this.storageDir.exists()) { - storageDir.mkdirs(); + initAndRecover(); + } + + private void initAndRecover() throws IOException { + if (!storageDir.exists()) { + if (!storageDir.mkdirs()) { + logger.warn("Unable to create consensus dir at {}", storageDir); + } } else { try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) { for (Path path : stream) { - String filename = path.getFileName().toString(); - String[] items = filename.split("_"); - TConsensusGroupType type = TConsensusGroupType.valueOf(items[0]); - ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createEmpty(type); - consensusGroupId.setId(Integer.parseInt(items[1])); + String[] items = path.getFileName().toString().split("_"); + ConsensusGroupId consensusGroupId = + ConsensusGroupId.Factory.create( + TConsensusGroupType.valueOf(items[0]), Integer.parseInt(items[1])); TEndPoint endPoint = new TEndPoint(items[2], Integer.parseInt(items[3])); stateMachineMap.put( consensusGroupId, @@ -96,40 +102,24 @@ class StandAloneConsensus implements IConsensus { @Override public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) { - AtomicReference<TSStatus> result = new AtomicReference<>(); - stateMachineMap.computeIfPresent( - groupId, - (k, v) -> { - // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect - // performance - result.set(v.write(request)); - return v; - }); - if (result.get() == null) { + StandAloneServerImpl impl = stateMachineMap.get(groupId); + if (impl == null) { return ConsensusWriteResponse.newBuilder() .setException(new ConsensusGroupNotExistException(groupId)) .build(); } - return ConsensusWriteResponse.newBuilder().setStatus(result.get()).build(); + return ConsensusWriteResponse.newBuilder().setStatus(impl.write(request)).build(); } @Override public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) { - AtomicReference<DataSet> result = new AtomicReference<>(); - stateMachineMap.computeIfPresent( - groupId, - (k, v) -> { - // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect - // performance - result.set(v.read(request)); - return v; - }); - if (result.get() == null) { + StandAloneServerImpl impl = stateMachineMap.get(groupId); + if (impl == null) { return ConsensusReadResponse.newBuilder() .setException(new ConsensusGroupNotExistException(groupId)) .build(); } - return ConsensusReadResponse.newBuilder().setDataSet(result.get()).build(); + return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build(); } @Override @@ -140,6 +130,11 @@ class StandAloneConsensus implements IConsensus { .setException(new IllegalPeerNumException(consensusGroupSize)) .build(); } + if (!Objects.equals(thisNode, peers.get(0).getEndpoint())) { + return ConsensusGenericResponse.newBuilder() + .setException(new IllegalPeerEndpointException(thisNode, peers.get(0).getEndpoint())) + .build(); + } AtomicBoolean exist = new AtomicBoolean(true); stateMachineMap.computeIfAbsent( groupId, @@ -148,19 +143,11 @@ class StandAloneConsensus implements IConsensus { StandAloneServerImpl impl = new StandAloneServerImpl(peers.get(0), registry.apply(groupId)); impl.start(); - String groupPath = - storageDir - + File.separator - + groupId.getType() - + "_" - + groupId.getId() - + "_" - + peers.get(0).getEndpoint().ip - + "_" - + peers.get(0).getEndpoint().port; - File file = new File(groupPath); - file.mkdirs(); - + String path = buildPeerDir(groupId); + File file = new File(path); + if (!file.mkdirs()) { + logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); + } return impl; }); if (exist.get()) { @@ -177,20 +164,13 @@ class StandAloneConsensus implements IConsensus { stateMachineMap.computeIfPresent( groupId, (k, v) -> { - String groupPath = - storageDir - + File.separator - + groupId.getType() - + "_" - + groupId.getId() - + "_" - + thisNode.ip - + "_" - + thisNode.port; - File file = new File(groupPath); - file.delete(); exist.set(true); v.stop(); + String path = buildPeerDir(groupId); + File file = new File(path); + if (!file.delete()) { + logger.warn("Unable to delete consensus dir for group {} at {}", groupId, path); + } return null; }); @@ -239,4 +219,16 @@ class StandAloneConsensus implements IConsensus { } return new Peer(groupId, thisNode); } + + private String buildPeerDir(ConsensusGroupId groupId) { + return storageDir + + File.separator + + groupId.getType() + + "_" + + groupId.getId() + + "_" + + thisNode.getIp() + + "_" + + thisNode.getPort(); + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java index be0dbaa6c7..4c55e9676b 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java @@ -20,11 +20,11 @@ package org.apache.iotdb.consensus.standalone; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.SnapshotMeta; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import java.io.File; import java.nio.ByteBuffer; @@ -69,17 +69,21 @@ public class StandAloneServerImpl implements IStateMachine { @Override public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) { - return false; + return stateMachine.takeSnapshot(metadata, snapshotDir); } @Override public SnapshotMeta getLatestSnapshot(File snapshotDir) { - return null; + return stateMachine.getLatestSnapshot(snapshotDir); } @Override - public void loadSnapshot(SnapshotMeta latest) {} + public void loadSnapshot(SnapshotMeta latest) { + stateMachine.loadSnapshot(latest); + } @Override - public void cleanUpOldSnapshots(File snapshotDir) {} + public void cleanUpOldSnapshots(File snapshotDir) { + stateMachine.cleanUpOldSnapshots(snapshotDir); + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java similarity index 97% rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java rename to consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java index 8693d1f6ce..118b057d1a 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.consensus.statemachine; +package org.apache.iotdb.consensus; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.consensus.common.DataSet; diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index fe209ba019..962dd84d95 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -19,11 +19,11 @@ package org.apache.iotdb.consensus.ratis; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.SnapshotMeta; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.apache.ratis.util.FileUtils; import org.slf4j.Logger; diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java index 0c9f0e74f6..a166893b6b 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java @@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.EmptyStateMachine; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; -import org.apache.iotdb.consensus.statemachine.EmptyStateMachine; import org.apache.ratis.util.FileUtils; import org.junit.After; @@ -48,7 +48,7 @@ public class RecoveryTest { consensusImpl = ConsensusFactory.getConsensusImpl( STANDALONE_CONSENSUS_CLASS_NAME, - new TEndPoint("localhost", 9000), + new TEndPoint("0.0.0.0", 9000), new File("./target/recovery"), gid -> new EmptyStateMachine()) .orElseThrow( diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java index 986e347356..1222598444 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java @@ -26,7 +26,9 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.PartitionRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.EmptyStateMachine; import org.apache.iotdb.consensus.IConsensus; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.SnapshotMeta; @@ -36,9 +38,8 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; +import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; import org.apache.iotdb.consensus.exception.IllegalPeerNumException; -import org.apache.iotdb.consensus.statemachine.EmptyStateMachine; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.apache.ratis.util.FileUtils; import org.junit.After; @@ -134,7 +135,7 @@ public class StandAloneConsensusTest { consensusImpl = ConsensusFactory.getConsensusImpl( STANDALONE_CONSENSUS_CLASS_NAME, - new TEndPoint("localhost", 6667), + new TEndPoint("0.0.0.0", 6667), new File("./target/standalone"), gid -> { switch (gid.getType()) { @@ -186,11 +187,18 @@ public class StandAloneConsensusTest { assertTrue(response3.getException() instanceof IllegalPeerNumException); ConsensusGenericResponse response4 = + consensusImpl.addConsensusGroup( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, new TEndPoint("0.0.0.1", 6667)))); + assertFalse(response4.isSuccess()); + assertTrue(response4.getException() instanceof IllegalPeerEndpointException); + + ConsensusGenericResponse response5 = consensusImpl.addConsensusGroup( schemaRegionId, Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 6667)))); - assertTrue(response4.isSuccess()); - assertNull(response4.getException()); + assertTrue(response5.isSuccess()); + assertNull(response5.getException()); } @Test diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java index aee4bcee55..adb1824c35 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java @@ -22,53 +22,66 @@ package org.apache.iotdb.commons.consensus; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -public interface ConsensusGroupId { +import java.util.Objects; - // return specific id - int getId(); +// we abstract this class to hide word `ConsensusGroup` for IoTDB StorageEngine/SchemaEngine +public abstract class ConsensusGroupId { - void setId(int id); + protected int id; + + // return specific id + public int getId() { + return id; + } // return specific type - TConsensusGroupType getType(); + public abstract TConsensusGroupType getType(); - class Factory { - public static ConsensusGroupId createEmpty(TConsensusGroupType type) { + @Override + public int hashCode() { + return Objects.hash(getType(), getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ConsensusGroupId that = (ConsensusGroupId) o; + return getId() == that.getId() && getType() == that.getType(); + } + + @Override + public String toString() { + return String.format("%s[%d]", getType(), getId()); + } + + public static class Factory { + + public static ConsensusGroupId create(TConsensusGroupType type, int id) { ConsensusGroupId groupId; switch (type) { case DataRegion: - groupId = new DataRegionId(); + groupId = new DataRegionId(id); break; case SchemaRegion: - groupId = new SchemaRegionId(); + groupId = new SchemaRegionId(id); break; case PartitionRegion: - groupId = new PartitionRegionId(); + groupId = new PartitionRegionId(id); break; default: - throw new IllegalArgumentException("unrecognized id type " + type); + throw new IllegalArgumentException("unrecognized id type " + id); } return groupId; } - public static ConsensusGroupId convertFromTConsensusGroupId( + public static ConsensusGroupId createFromTConsensusGroupId( TConsensusGroupId tConsensusGroupId) { - ConsensusGroupId groupId = createEmpty(tConsensusGroupId.getType()); - groupId.setId(tConsensusGroupId.getId()); - return groupId; - } - - public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId consensusGroupId) { - TConsensusGroupId result = new TConsensusGroupId(); - if (consensusGroupId instanceof SchemaRegionId) { - result.setType(TConsensusGroupType.SchemaRegion); - } else if (consensusGroupId instanceof DataRegionId) { - result.setType(TConsensusGroupType.DataRegion); - } else if (consensusGroupId instanceof PartitionRegionId) { - result.setType(TConsensusGroupType.PartitionRegion); - } - result.setId(consensusGroupId.getId()); - return result; + return create(tConsensusGroupId.getType(), tConsensusGroupId.getId()); } } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java index 81fed3fe62..03ec5e4d7d 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java @@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import java.util.Objects; - -public class DataRegionId implements ConsensusGroupId { - - private int id; - - public DataRegionId() {} +public class DataRegionId extends ConsensusGroupId { public DataRegionId(int id) { this.id = id; } - @Override - public int getId() { - return id; - } - - @Override - public void setId(int id) { - this.id = id; - } - @Override public TConsensusGroupType getType() { return TConsensusGroupType.DataRegion; } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DataRegionId that = (DataRegionId) o; - return id == that.id; - } - - @Override - public int hashCode() { - return Objects.hash(id, TConsensusGroupType.DataRegion); - } - - public String toString() { - return String.format("%s[%d]", getType(), getId()); - } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java index 6f317547ed..82cb7a2fed 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java @@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import java.util.Objects; - -public class PartitionRegionId implements ConsensusGroupId { - - private int id; - - public PartitionRegionId() {} +public class PartitionRegionId extends ConsensusGroupId { public PartitionRegionId(int id) { this.id = id; } - @Override - public int getId() { - return id; - } - - @Override - public void setId(int id) { - this.id = id; - } - @Override public TConsensusGroupType getType() { return TConsensusGroupType.PartitionRegion; } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PartitionRegionId that = (PartitionRegionId) o; - return id == that.id; - } - - @Override - public int hashCode() { - return Objects.hash(id, TConsensusGroupType.PartitionRegion); - } - - public String toString() { - return String.format("%s[%d]", getType(), getId()); - } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java index 61c4ca80c5..48b9cbf820 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java @@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import java.util.Objects; - -public class SchemaRegionId implements ConsensusGroupId { - - private int id; - - public SchemaRegionId() {} +public class SchemaRegionId extends ConsensusGroupId { public SchemaRegionId(int id) { this.id = id; } - @Override - public int getId() { - return id; - } - - @Override - public void setId(int id) { - this.id = id; - } - @Override public TConsensusGroupType getType() { return TConsensusGroupType.SchemaRegion; } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SchemaRegionId that = (SchemaRegionId) o; - return id == that.id; - } - - @Override - public int hashCode() { - return Objects.hash(id, TConsensusGroupType.SchemaRegion); - } - - public String toString() { - return String.format("%s[%d]", getType(), getId()); - } } diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java index af19aea35e..f00b6dbb9c 100644 --- a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java +++ b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java @@ -34,14 +34,14 @@ public class ConsensusGroupIdTest { @Test public void TestCreate() throws IOException { ConsensusGroupId dataRegionId = - ConsensusGroupId.Factory.convertFromTConsensusGroupId( + ConsensusGroupId.Factory.createFromTConsensusGroupId( new TConsensusGroupId(TConsensusGroupType.DataRegion, 1)); Assert.assertTrue(dataRegionId instanceof DataRegionId); Assert.assertEquals(1, dataRegionId.getId()); Assert.assertEquals(TConsensusGroupType.DataRegion, dataRegionId.getType()); ConsensusGroupId schemaRegionId = - ConsensusGroupId.Factory.convertFromTConsensusGroupId( + ConsensusGroupId.Factory.createFromTConsensusGroupId( new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2)); Assert.assertTrue(schemaRegionId instanceof SchemaRegionId); Assert.assertEquals(2, schemaRegionId.getId()); diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java index 873d652013..a19291477e 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java @@ -20,10 +20,10 @@ package org.apache.iotdb.db.consensus.statemachine; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.statemachine.IStateMachine; import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java index 86810c8372..672b5206fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java @@ -89,7 +89,7 @@ public class InternalServiceImpl implements InternalService.Iface { public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) { QueryType type = QueryType.valueOf(req.queryType); ConsensusGroupId groupId = - ConsensusGroupId.Factory.convertFromTConsensusGroupId(req.getConsensusGroupId()); + ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); switch (type) { case READ: ConsensusReadResponse readResp = diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java index bdedbbf4d2..027260d071 100644 --- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java +++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java @@ -85,7 +85,7 @@ public class InternalServiceImplTest { TRegionReplicaSet regionReplicaSet = genRegionReplicaSet(); ConsensusImpl.getInstance() .addConsensusGroup( - ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId()), + ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()), genPeerList(regionReplicaSet)); internalServiceImpl = new InternalServiceImpl(); } @@ -95,7 +95,7 @@ public class InternalServiceImplTest { TRegionReplicaSet regionReplicaSet = genRegionReplicaSet(); ConsensusImpl.getInstance() .removeConsensusGroup( - ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId())); + ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId())); FileUtils.deleteFully(new File(conf.getConsensusDir())); }
