This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 344b0891ff7 [IOTDB-5985] Refactor RatisConsensus UT framework (#10110)
344b0891ff7 is described below
commit 344b0891ff757f2918b9eb6d42bc6827830bf640
Author: William Song <[email protected]>
AuthorDate: Mon Jun 19 00:34:45 2023 +0800
[IOTDB-5985] Refactor RatisConsensus UT framework (#10110)
---
.../iotdb/consensus/ratis/RatisConsensusTest.java | 153 ++++++--------------
.../apache/iotdb/consensus/ratis/TestUtils.java | 160 +++++++++++++++++++++
2 files changed, 206 insertions(+), 107 deletions(-)
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index cb86a0f79d9..f0c19f039f8 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -18,32 +18,25 @@
*/
package org.apache.iotdb.consensus.ratis;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
-import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -55,90 +48,54 @@ public class RatisConsensusTest {
private ConsensusGroupId gid;
private List<Peer> peers;
- private List<File> peersStorage;
- private List<IConsensus> servers;
- private List<TestUtils.IntegerCounter> stateMachines;
+ private List<RatisConsensus> servers;
+ private List<IStateMachine> stateMachines;
private ConsensusGroup group;
CountDownLatch latch;
- private void makeServers() throws IOException {
- for (int i = 0; i < 3; i++) {
- stateMachines.add(new TestUtils.IntegerCounter());
- RatisConfig config =
- RatisConfig.newBuilder()
- .setLog(
- RatisConfig.Log.newBuilder()
- .setPurgeUptoSnapshotIndex(true)
- .setPurgeGap(10)
- .setUnsafeFlushEnabled(false)
- .build())
- .setSnapshot(
- RatisConfig.Snapshot.newBuilder()
- .setAutoTriggerThreshold(100)
- .setCreationGap(10)
- .build())
- .setRpc(
- RatisConfig.Rpc.newBuilder()
- .setFirstElectionTimeoutMin(TimeDuration.valueOf(1,
TimeUnit.SECONDS))
- .setFirstElectionTimeoutMax(TimeDuration.valueOf(4,
TimeUnit.SECONDS))
- .setTimeoutMin(TimeDuration.valueOf(1, TimeUnit.SECONDS))
- .setTimeoutMax(TimeDuration.valueOf(4, TimeUnit.SECONDS))
- .build())
- .setImpl(
- RatisConfig.Impl.newBuilder()
- .setTriggerSnapshotFileSize(1)
- .setTriggerSnapshotTime(4)
- .build())
- .build();
- int finalI = i;
- servers.add(
- ConsensusFactory.getConsensusImpl(
- ConsensusFactory.RATIS_CONSENSUS,
- ConsensusConfig.newBuilder()
- .setThisNodeId(peers.get(i).getNodeId())
- .setThisNode(peers.get(i).getEndpoint())
- .setRatisConfig(config)
- .setStorageDir(peersStorage.get(i).getAbsolutePath())
- .build(),
- groupId -> stateMachines.get(finalI))
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(
- ConsensusFactory.CONSTRUCT_FAILED_MSG,
- ConsensusFactory.RATIS_CONSENSUS))));
- servers.get(i).start();
- }
- }
+ private TestUtils.MiniCluster miniCluster;
+
+ private final RatisConfig config =
+ RatisConfig.newBuilder()
+ .setLog(
+ RatisConfig.Log.newBuilder()
+ .setPurgeUptoSnapshotIndex(true)
+ .setPurgeGap(10)
+ .setUnsafeFlushEnabled(false)
+ .build())
+ .setSnapshot(
+ RatisConfig.Snapshot.newBuilder()
+ .setAutoTriggerThreshold(100)
+ .setCreationGap(10)
+ .build())
+ .setRpc(
+ RatisConfig.Rpc.newBuilder()
+ .setFirstElectionTimeoutMin(TimeDuration.valueOf(1,
TimeUnit.SECONDS))
+ .setFirstElectionTimeoutMax(TimeDuration.valueOf(4,
TimeUnit.SECONDS))
+ .setTimeoutMin(TimeDuration.valueOf(1, TimeUnit.SECONDS))
+ .setTimeoutMax(TimeDuration.valueOf(4, TimeUnit.SECONDS))
+ .build())
+ .setImpl(
+ RatisConfig.Impl.newBuilder()
+ .setTriggerSnapshotFileSize(1)
+ .setTriggerSnapshotTime(4)
+ .build())
+ .build();
@Before
public void setUp() throws IOException {
- gid = new DataRegionId(1);
- peers = new ArrayList<>();
- peers.add(new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)));
- peers.add(new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)));
- peers.add(new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002)));
- peersStorage = new ArrayList<>();
- peersStorage.add(new File("target" + java.io.File.separator + "1"));
- peersStorage.add(new File("target" + java.io.File.separator + "2"));
- peersStorage.add(new File("target" + java.io.File.separator + "3"));
- for (File dir : peersStorage) {
- dir.mkdirs();
- }
- group = new ConsensusGroup(gid, peers);
- servers = new ArrayList<>();
- stateMachines = new ArrayList<>();
- makeServers();
+ miniCluster = new
TestUtils.MiniClusterFactory().setRatisConfig(config).create();
+ miniCluster.start();
+ gid = miniCluster.getGid();
+ servers = miniCluster.getServers();
+ group = miniCluster.getGroup();
+ peers = miniCluster.getPeers();
+ stateMachines = miniCluster.getStateMachines();
}
@After
public void tearDown() throws IOException {
- for (int i = 0; i < 3; i++) {
- servers.get(i).stop();
- }
- for (File file : peersStorage) {
- FileUtils.deleteFully(file);
- }
+ miniCluster.cleanUp();
}
@Test
@@ -168,7 +125,8 @@ public class RatisConsensusTest {
servers.get(2).createPeer(group.getGroupId(), Collections.emptyList());
servers.get(0).changePeer(group.getGroupId(), peers);
- Assert.assertEquals(stateMachines.get(0).getConfiguration().size(), 3);
+ Assert.assertEquals(
+ ((TestUtils.IntegerCounter)
stateMachines.get(0)).getConfiguration().size(), 3);
doConsensus(servers.get(0), group.getGroupId(), 10, 20);
}
@@ -190,23 +148,14 @@ public class RatisConsensusTest {
}
@Test
- public void oneMemberGroupChange1() throws Exception {
- oneMemberGroupChangeImpl(false);
- }
-
- @Test
- public void oneMemberGroupChange2() throws Exception {
- oneMemberGroupChangeImpl(true);
- }
-
- private void oneMemberGroupChangeImpl(boolean previousRemove) throws
Exception {
+ public void oneMemberGroupChange() throws Exception {
servers.get(0).createPeer(group.getGroupId(), peers.subList(0, 1));
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
servers.get(0).addPeer(group.getGroupId(), peers.get(1));
servers.get(1).transferLeader(group.getGroupId(), peers.get(1));
- servers.get(previousRemove ? 0 : 1).removePeer(group.getGroupId(),
peers.get(0));
+ servers.get(0).removePeer(group.getGroupId(), peers.get(0));
servers.get(0).deletePeer(group.getGroupId());
}
@@ -219,12 +168,8 @@ public class RatisConsensusTest {
// 200 operation will trigger snapshot & purge
doConsensus(servers.get(0), group.getGroupId(), 200, 200);
- for (IConsensus consensus : servers) {
- consensus.stop();
- }
- servers.clear();
+ miniCluster.restart();
- makeServers();
doConsensus(servers.get(0), gid, 10, 210);
}
@@ -270,10 +215,7 @@ public class RatisConsensusTest {
for (int i = 0; i < count; i++) {
executorService.submit(
() -> {
- ByteBuffer incr = ByteBuffer.allocate(4);
- incr.putInt(1);
- incr.flip();
- ByteBufferConsensusRequest incrReq = new
ByteBufferConsensusRequest(incr);
+ ByteBufferConsensusRequest incrReq =
TestUtils.TestRequest.incrRequest();
ConsensusWriteResponse response = consensus.write(gid, incrReq);
if (response.getException() != null) {
@@ -289,10 +231,7 @@ public class RatisConsensusTest {
// wait at most 60s for write to complete, otherwise fail the test
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
- ByteBuffer get = ByteBuffer.allocate(4);
- get.putInt(2);
- get.flip();
- ByteBufferConsensusRequest getReq = new ByteBufferConsensusRequest(get);
+ ByteBufferConsensusRequest getReq = TestUtils.TestRequest.getRequest();
// wait at most 60s to discover a valid leader
long start = System.currentTimeMillis();
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 03682b03126..99f5bc9d81d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -21,13 +21,20 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
+import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,9 +43,13 @@ import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
public class TestUtils {
public static class TestDataSet implements DataSet {
@@ -70,6 +81,20 @@ public class TestUtils {
buffer.flip();
return buffer;
}
+
+ static ByteBufferConsensusRequest incrRequest() {
+ ByteBuffer incr = ByteBuffer.allocate(4);
+ incr.putInt(1);
+ incr.flip();
+ return new ByteBufferConsensusRequest(incr);
+ }
+
+ static ByteBufferConsensusRequest getRequest() {
+ ByteBuffer get = ByteBuffer.allocate(4);
+ get.putInt(2);
+ get.flip();
+ return new ByteBufferConsensusRequest(get);
+ }
}
public static class IntegerCounter implements IStateMachine,
IStateMachine.EventApi {
@@ -174,4 +199,139 @@ public class TestUtils {
return configuration;
}
}
+
+ /** A Mini Raft CLuster Wrapper for Test Env. */
+ static class MiniCluster {
+ private final ConsensusGroupId gid;
+ private final int replicas;
+ private final List<Peer> peers;
+ private final List<File> peerStorage;
+ private final List<IStateMachine> stateMachines;
+ private final RatisConfig config;
+ private final List<RatisConsensus> servers;
+ private final ConsensusGroup group;
+
+ private MiniCluster(
+ ConsensusGroupId gid,
+ int replicas,
+ Function<Integer, File> storageProvider,
+ Supplier<IStateMachine> smProvider,
+ RatisConfig config) {
+ this.gid = gid;
+ this.replicas = replicas;
+ this.config = config;
+ Preconditions.checkArgument(
+ replicas % 2 == 1, "Test Env Raft Group should consists singular
peers");
+
+ this.peers = new ArrayList<>();
+ this.peerStorage = new ArrayList<>();
+ this.stateMachines = new ArrayList<>();
+ this.servers = new ArrayList<>();
+
+ for (int i = 0; i < replicas; i++) {
+ peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", 6001 + i)));
+
+ final File storage = storageProvider.apply(i);
+ FileUtils.deleteFileQuietly(storage);
+ storage.mkdirs();
+ peerStorage.add(storage);
+
+ stateMachines.add(smProvider.get());
+ }
+ group = new ConsensusGroup(gid, peers);
+ makeServers();
+ }
+
+ private void makeServers() {
+ for (int i = 0; i < replicas; i++) {
+ final int fi = i;
+ servers.add(
+ (RatisConsensus)
+ ConsensusFactory.getConsensusImpl(
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusConfig.newBuilder()
+ .setThisNodeId(peers.get(i).getNodeId())
+ .setThisNode(peers.get(i).getEndpoint())
+ .setRatisConfig(config)
+
.setStorageDir(this.peerStorage.get(i).getAbsolutePath())
+ .build(),
+ groupId -> stateMachines.get(fi))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ ConsensusFactory.RATIS_CONSENSUS))));
+ }
+ }
+
+ void start() throws IOException {
+ for (RatisConsensus server : servers) {
+ server.start();
+ }
+ }
+
+ void stop() throws IOException {
+ for (RatisConsensus server : servers) {
+ server.stop();
+ }
+ }
+
+ void cleanUp() throws IOException {
+ stop();
+ for (File storage : peerStorage) {
+ FileUtils.deleteFully(storage);
+ }
+ }
+
+ void restart() throws IOException {
+ stop();
+ servers.clear();
+ makeServers();
+ start();
+ }
+
+ List<RatisConsensus> getServers() {
+ return Collections.unmodifiableList(servers);
+ }
+
+ RatisConsensus getServer(int index) {
+ return servers.get(index);
+ }
+
+ List<IStateMachine> getStateMachines() {
+ return Collections.unmodifiableList(stateMachines);
+ }
+
+ ConsensusGroupId getGid() {
+ return gid;
+ }
+
+ List<Peer> getPeers() {
+ return peers;
+ }
+
+ ConsensusGroup getGroup() {
+ return group;
+ }
+ }
+
+ static class MiniClusterFactory {
+ private int replicas = 3;
+ private ConsensusGroupId gid = new DataRegionId(1);
+ private Function<Integer, File> peerStorageProvider =
+ peerId -> new File("target" + java.io.File.separator + peerId);
+
+ private Supplier<IStateMachine> smProvider = TestUtils.IntegerCounter::new;
+ private RatisConfig ratisConfig;
+
+ MiniClusterFactory setRatisConfig(RatisConfig ratisConfig) {
+ this.ratisConfig = ratisConfig;
+ return this;
+ }
+
+ MiniCluster create() {
+ return new MiniCluster(gid, replicas, peerStorageProvider, smProvider,
ratisConfig);
+ }
+ }
}