This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 344b0891ff7 [IOTDB-5985] Refactor RatisConsensus UT framework (#10110)
344b0891ff7 is described below

commit 344b0891ff757f2918b9eb6d42bc6827830bf640
Author: William Song <[email protected]>
AuthorDate: Mon Jun 19 00:34:45 2023 +0800

    [IOTDB-5985] Refactor RatisConsensus UT framework (#10110)
---
 .../iotdb/consensus/ratis/RatisConsensusTest.java  | 153 ++++++--------------
 .../apache/iotdb/consensus/ratis/TestUtils.java    | 160 +++++++++++++++++++++
 2 files changed, 206 insertions(+), 107 deletions(-)

diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index cb86a0f79d9..f0c19f039f8 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -18,32 +18,25 @@
  */
 package org.apache.iotdb.consensus.ratis;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 
-import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -55,90 +48,54 @@ public class RatisConsensusTest {
 
   private ConsensusGroupId gid;
   private List<Peer> peers;
-  private List<File> peersStorage;
-  private List<IConsensus> servers;
-  private List<TestUtils.IntegerCounter> stateMachines;
+  private List<RatisConsensus> servers;
+  private List<IStateMachine> stateMachines;
   private ConsensusGroup group;
   CountDownLatch latch;
 
-  private void makeServers() throws IOException {
-    for (int i = 0; i < 3; i++) {
-      stateMachines.add(new TestUtils.IntegerCounter());
-      RatisConfig config =
-          RatisConfig.newBuilder()
-              .setLog(
-                  RatisConfig.Log.newBuilder()
-                      .setPurgeUptoSnapshotIndex(true)
-                      .setPurgeGap(10)
-                      .setUnsafeFlushEnabled(false)
-                      .build())
-              .setSnapshot(
-                  RatisConfig.Snapshot.newBuilder()
-                      .setAutoTriggerThreshold(100)
-                      .setCreationGap(10)
-                      .build())
-              .setRpc(
-                  RatisConfig.Rpc.newBuilder()
-                      .setFirstElectionTimeoutMin(TimeDuration.valueOf(1, 
TimeUnit.SECONDS))
-                      .setFirstElectionTimeoutMax(TimeDuration.valueOf(4, 
TimeUnit.SECONDS))
-                      .setTimeoutMin(TimeDuration.valueOf(1, TimeUnit.SECONDS))
-                      .setTimeoutMax(TimeDuration.valueOf(4, TimeUnit.SECONDS))
-                      .build())
-              .setImpl(
-                  RatisConfig.Impl.newBuilder()
-                      .setTriggerSnapshotFileSize(1)
-                      .setTriggerSnapshotTime(4)
-                      .build())
-              .build();
-      int finalI = i;
-      servers.add(
-          ConsensusFactory.getConsensusImpl(
-                  ConsensusFactory.RATIS_CONSENSUS,
-                  ConsensusConfig.newBuilder()
-                      .setThisNodeId(peers.get(i).getNodeId())
-                      .setThisNode(peers.get(i).getEndpoint())
-                      .setRatisConfig(config)
-                      .setStorageDir(peersStorage.get(i).getAbsolutePath())
-                      .build(),
-                  groupId -> stateMachines.get(finalI))
-              .orElseThrow(
-                  () ->
-                      new IllegalArgumentException(
-                          String.format(
-                              ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                              ConsensusFactory.RATIS_CONSENSUS))));
-      servers.get(i).start();
-    }
-  }
+  private TestUtils.MiniCluster miniCluster;
+
+  private final RatisConfig config =
+      RatisConfig.newBuilder()
+          .setLog(
+              RatisConfig.Log.newBuilder()
+                  .setPurgeUptoSnapshotIndex(true)
+                  .setPurgeGap(10)
+                  .setUnsafeFlushEnabled(false)
+                  .build())
+          .setSnapshot(
+              RatisConfig.Snapshot.newBuilder()
+                  .setAutoTriggerThreshold(100)
+                  .setCreationGap(10)
+                  .build())
+          .setRpc(
+              RatisConfig.Rpc.newBuilder()
+                  .setFirstElectionTimeoutMin(TimeDuration.valueOf(1, 
TimeUnit.SECONDS))
+                  .setFirstElectionTimeoutMax(TimeDuration.valueOf(4, 
TimeUnit.SECONDS))
+                  .setTimeoutMin(TimeDuration.valueOf(1, TimeUnit.SECONDS))
+                  .setTimeoutMax(TimeDuration.valueOf(4, TimeUnit.SECONDS))
+                  .build())
+          .setImpl(
+              RatisConfig.Impl.newBuilder()
+                  .setTriggerSnapshotFileSize(1)
+                  .setTriggerSnapshotTime(4)
+                  .build())
+          .build();
 
   @Before
   public void setUp() throws IOException {
-    gid = new DataRegionId(1);
-    peers = new ArrayList<>();
-    peers.add(new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)));
-    peers.add(new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)));
-    peers.add(new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002)));
-    peersStorage = new ArrayList<>();
-    peersStorage.add(new File("target" + java.io.File.separator + "1"));
-    peersStorage.add(new File("target" + java.io.File.separator + "2"));
-    peersStorage.add(new File("target" + java.io.File.separator + "3"));
-    for (File dir : peersStorage) {
-      dir.mkdirs();
-    }
-    group = new ConsensusGroup(gid, peers);
-    servers = new ArrayList<>();
-    stateMachines = new ArrayList<>();
-    makeServers();
+    miniCluster = new 
TestUtils.MiniClusterFactory().setRatisConfig(config).create();
+    miniCluster.start();
+    gid = miniCluster.getGid();
+    servers = miniCluster.getServers();
+    group = miniCluster.getGroup();
+    peers = miniCluster.getPeers();
+    stateMachines = miniCluster.getStateMachines();
   }
 
   @After
   public void tearDown() throws IOException {
-    for (int i = 0; i < 3; i++) {
-      servers.get(i).stop();
-    }
-    for (File file : peersStorage) {
-      FileUtils.deleteFully(file);
-    }
+    miniCluster.cleanUp();
   }
 
   @Test
@@ -168,7 +125,8 @@ public class RatisConsensusTest {
     servers.get(2).createPeer(group.getGroupId(), Collections.emptyList());
     servers.get(0).changePeer(group.getGroupId(), peers);
 
-    Assert.assertEquals(stateMachines.get(0).getConfiguration().size(), 3);
+    Assert.assertEquals(
+        ((TestUtils.IntegerCounter) 
stateMachines.get(0)).getConfiguration().size(), 3);
     doConsensus(servers.get(0), group.getGroupId(), 10, 20);
   }
 
@@ -190,23 +148,14 @@ public class RatisConsensusTest {
   }
 
   @Test
-  public void oneMemberGroupChange1() throws Exception {
-    oneMemberGroupChangeImpl(false);
-  }
-
-  @Test
-  public void oneMemberGroupChange2() throws Exception {
-    oneMemberGroupChangeImpl(true);
-  }
-
-  private void oneMemberGroupChangeImpl(boolean previousRemove) throws 
Exception {
+  public void oneMemberGroupChange() throws Exception {
     servers.get(0).createPeer(group.getGroupId(), peers.subList(0, 1));
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
 
     servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
     servers.get(0).addPeer(group.getGroupId(), peers.get(1));
     servers.get(1).transferLeader(group.getGroupId(), peers.get(1));
-    servers.get(previousRemove ? 0 : 1).removePeer(group.getGroupId(), 
peers.get(0));
+    servers.get(0).removePeer(group.getGroupId(), peers.get(0));
     servers.get(0).deletePeer(group.getGroupId());
   }
 
@@ -219,12 +168,8 @@ public class RatisConsensusTest {
     // 200 operation will trigger snapshot & purge
     doConsensus(servers.get(0), group.getGroupId(), 200, 200);
 
-    for (IConsensus consensus : servers) {
-      consensus.stop();
-    }
-    servers.clear();
+    miniCluster.restart();
 
-    makeServers();
     doConsensus(servers.get(0), gid, 10, 210);
   }
 
@@ -270,10 +215,7 @@ public class RatisConsensusTest {
     for (int i = 0; i < count; i++) {
       executorService.submit(
           () -> {
-            ByteBuffer incr = ByteBuffer.allocate(4);
-            incr.putInt(1);
-            incr.flip();
-            ByteBufferConsensusRequest incrReq = new 
ByteBufferConsensusRequest(incr);
+            ByteBufferConsensusRequest incrReq = 
TestUtils.TestRequest.incrRequest();
 
             ConsensusWriteResponse response = consensus.write(gid, incrReq);
             if (response.getException() != null) {
@@ -289,10 +231,7 @@ public class RatisConsensusTest {
     // wait at most 60s for write to complete, otherwise fail the test
     Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
 
-    ByteBuffer get = ByteBuffer.allocate(4);
-    get.putInt(2);
-    get.flip();
-    ByteBufferConsensusRequest getReq = new ByteBufferConsensusRequest(get);
+    ByteBufferConsensusRequest getReq = TestUtils.TestRequest.getRequest();
 
     // wait at most 60s to discover a valid leader
     long start = System.currentTimeMillis();
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 03682b03126..99f5bc9d81d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -21,13 +21,20 @@ package org.apache.iotdb.consensus.ratis;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.RatisConfig;
 
+import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
+import org.apache.ratis.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,9 +43,13 @@ import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Scanner;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 public class TestUtils {
   public static class TestDataSet implements DataSet {
@@ -70,6 +81,20 @@ public class TestUtils {
       buffer.flip();
       return buffer;
     }
+
+    static ByteBufferConsensusRequest incrRequest() {
+      ByteBuffer incr = ByteBuffer.allocate(4);
+      incr.putInt(1);
+      incr.flip();
+      return new ByteBufferConsensusRequest(incr);
+    }
+
+    static ByteBufferConsensusRequest getRequest() {
+      ByteBuffer get = ByteBuffer.allocate(4);
+      get.putInt(2);
+      get.flip();
+      return new ByteBufferConsensusRequest(get);
+    }
   }
 
   public static class IntegerCounter implements IStateMachine, 
IStateMachine.EventApi {
@@ -174,4 +199,139 @@ public class TestUtils {
       return configuration;
     }
   }
+
+  /** A Mini Raft CLuster Wrapper for Test Env. */
+  static class MiniCluster {
+    private final ConsensusGroupId gid;
+    private final int replicas;
+    private final List<Peer> peers;
+    private final List<File> peerStorage;
+    private final List<IStateMachine> stateMachines;
+    private final RatisConfig config;
+    private final List<RatisConsensus> servers;
+    private final ConsensusGroup group;
+
+    private MiniCluster(
+        ConsensusGroupId gid,
+        int replicas,
+        Function<Integer, File> storageProvider,
+        Supplier<IStateMachine> smProvider,
+        RatisConfig config) {
+      this.gid = gid;
+      this.replicas = replicas;
+      this.config = config;
+      Preconditions.checkArgument(
+          replicas % 2 == 1, "Test Env Raft Group should consists singular 
peers");
+
+      this.peers = new ArrayList<>();
+      this.peerStorage = new ArrayList<>();
+      this.stateMachines = new ArrayList<>();
+      this.servers = new ArrayList<>();
+
+      for (int i = 0; i < replicas; i++) {
+        peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", 6001 + i)));
+
+        final File storage = storageProvider.apply(i);
+        FileUtils.deleteFileQuietly(storage);
+        storage.mkdirs();
+        peerStorage.add(storage);
+
+        stateMachines.add(smProvider.get());
+      }
+      group = new ConsensusGroup(gid, peers);
+      makeServers();
+    }
+
+    private void makeServers() {
+      for (int i = 0; i < replicas; i++) {
+        final int fi = i;
+        servers.add(
+            (RatisConsensus)
+                ConsensusFactory.getConsensusImpl(
+                        ConsensusFactory.RATIS_CONSENSUS,
+                        ConsensusConfig.newBuilder()
+                            .setThisNodeId(peers.get(i).getNodeId())
+                            .setThisNode(peers.get(i).getEndpoint())
+                            .setRatisConfig(config)
+                            
.setStorageDir(this.peerStorage.get(i).getAbsolutePath())
+                            .build(),
+                        groupId -> stateMachines.get(fi))
+                    .orElseThrow(
+                        () ->
+                            new IllegalArgumentException(
+                                String.format(
+                                    ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                                    ConsensusFactory.RATIS_CONSENSUS))));
+      }
+    }
+
+    void start() throws IOException {
+      for (RatisConsensus server : servers) {
+        server.start();
+      }
+    }
+
+    void stop() throws IOException {
+      for (RatisConsensus server : servers) {
+        server.stop();
+      }
+    }
+
+    void cleanUp() throws IOException {
+      stop();
+      for (File storage : peerStorage) {
+        FileUtils.deleteFully(storage);
+      }
+    }
+
+    void restart() throws IOException {
+      stop();
+      servers.clear();
+      makeServers();
+      start();
+    }
+
+    List<RatisConsensus> getServers() {
+      return Collections.unmodifiableList(servers);
+    }
+
+    RatisConsensus getServer(int index) {
+      return servers.get(index);
+    }
+
+    List<IStateMachine> getStateMachines() {
+      return Collections.unmodifiableList(stateMachines);
+    }
+
+    ConsensusGroupId getGid() {
+      return gid;
+    }
+
+    List<Peer> getPeers() {
+      return peers;
+    }
+
+    ConsensusGroup getGroup() {
+      return group;
+    }
+  }
+
+  static class MiniClusterFactory {
+    private int replicas = 3;
+    private ConsensusGroupId gid = new DataRegionId(1);
+    private Function<Integer, File> peerStorageProvider =
+        peerId -> new File("target" + java.io.File.separator + peerId);
+
+    private Supplier<IStateMachine> smProvider = TestUtils.IntegerCounter::new;
+    private RatisConfig ratisConfig;
+
+    MiniClusterFactory setRatisConfig(RatisConfig ratisConfig) {
+      this.ratisConfig = ratisConfig;
+      return this;
+    }
+
+    MiniCluster create() {
+      return new MiniCluster(gid, replicas, peerStorageProvider, smProvider, 
ratisConfig);
+    }
+  }
 }

Reply via email to