This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_catch_up by this push:
     new 8ec7aee64d rename peer to peerinfo
8ec7aee64d is described below

commit 8ec7aee64dd886a0eb168436fc9d0a1299cd5864
Author: jt <[email protected]>
AuthorDate: Mon May 9 10:40:39 2022 +0800

    rename peer to peerinfo
---
 .../cluster/impl/NativeSingleRaftConsensus.java    | 90 ++++++++++++++++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  8 +-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     | 24 +++---
 .../server/handlers/caller/HeartbeatHandler.java   | 22 +++---
 .../cluster/server/member/DataGroupMember.java     |  6 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 24 +++---
 .../server/monitor/{Peer.java => PeerInfo.java}    |  4 +-
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 40 +++++-----
 .../caller/AppendNodeEntryHandlerTest.java         | 10 +--
 .../apache/iotdb/consensus/ISingleConsensus.java   | 58 ++++++++++++++
 10 files changed, 216 insertions(+), 70 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
new file mode 100644
index 0000000000..b5332032c9
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.impl;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.consensus.ISingleConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+
+public class NativeSingleRaftConsensus implements ISingleConsensus {
+
+  private RaftMember raftMember;
+
+  @Override
+  public void start() throws IOException {
+    raftMember.start();
+  }
+
+  @Override
+  public void stop() throws IOException {
+    raftMember.stop();
+  }
+
+  @Override
+  public ConsensusWriteResponse write(IConsensusRequest request) {
+    return null;
+  }
+
+  @Override
+  public ConsensusReadResponse read(IConsensusRequest request) {
+    return null;
+  }
+
+  @Override
+  public ConsensusGenericResponse addPeer(Peer peer) {
+    return null;
+  }
+
+  @Override
+  public ConsensusGenericResponse removePeer(Peer peer) {
+    return null;
+  }
+
+  @Override
+  public ConsensusGenericResponse changePeer(List<Peer> newPeers) {
+    return null;
+  }
+
+  @Override
+  public ConsensusGenericResponse transferLeader(Peer newLeader) {
+    return null;
+  }
+
+  @Override
+  public ConsensusGenericResponse triggerSnapshot() {
+    return null;
+  }
+
+  @Override
+  public boolean isLeader() {
+    return false;
+  }
+
+  @Override
+  public Peer getLeader() {
+    return null;
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 4bd540a91c..9377782127 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -32,7 +32,7 @@ import 
org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -283,7 +283,7 @@ public class LogDispatcher {
     Node receiver;
     private BlockingQueue<SendLogRequest> logBlockingDeque;
     protected List<SendLogRequest> currBatch = new ArrayList<>();
-    private Peer peer;
+    private PeerInfo peerInfo;
     Client syncClient;
     AsyncClient asyncClient;
     private String baseName;
@@ -291,7 +291,7 @@ public class LogDispatcher {
     DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> 
logBlockingDeque) {
       this.receiver = receiver;
       this.logBlockingDeque = logBlockingDeque;
-      this.peer = member.getPeer(receiver);
+      this.peerInfo = member.getPeer(receiver);
       if (!clusterConfig.isUseAsyncServer()) {
         syncClient = member.getSyncClient(receiver);
       }
@@ -356,7 +356,7 @@ public class LogDispatcher {
         List<ByteBuffer> logList, AppendEntriesRequest request, 
List<SendLogRequest> currBatch) {
 
       long startTime = 
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
-      if (!member.waitForPrevLog(peer, 
currBatch.get(0).getVotingLog().getLog())) {
+      if (!member.waitForPrevLog(peerInfo, 
currBatch.get(0).getVotingLog().getLog())) {
         logger.warn(
             "{}: node {} timed out when appending {}",
             member.getName(),
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index cee7b524e0..2e7e6f395b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 
@@ -50,7 +50,7 @@ public class CatchUpTask implements Runnable {
   private static final Logger logger = 
LoggerFactory.getLogger(CatchUpTask.class);
 
   private Node node;
-  private Peer peer;
+  private PeerInfo peerInfo;
   private RaftMember raftMember;
   private Snapshot snapshot;
   private List<Log> logs;
@@ -61,10 +61,10 @@ public class CatchUpTask implements Runnable {
 
   private long startTime;
 
-  public CatchUpTask(Node node, int raftId, Peer peer, RaftMember raftMember, 
long lastLogIdx) {
+  public CatchUpTask(Node node, int raftId, PeerInfo peerInfo, RaftMember 
raftMember, long lastLogIdx) {
     this.node = node;
     this.raftId = raftId;
-    this.peer = peer;
+    this.peerInfo = peerInfo;
     this.raftMember = raftMember;
     this.logs = Collections.emptyList();
     this.snapshot = null;
@@ -88,7 +88,7 @@ public class CatchUpTask implements Runnable {
     try {
       // to avoid snapshot catch up when index is volatile
       localFirstIndex = raftMember.getLogManager().getFirstIndex();
-      lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
+      lo = Math.max(localFirstIndex, peerInfo.getMatchIndex() + 1);
       hi = raftMember.getLogManager().getLastLogIndex() + 1;
       logs = raftMember.getLogManager().getEntries(lo, hi);
 
@@ -121,7 +121,7 @@ public class CatchUpTask implements Runnable {
       if (!judgeUseLogsInDiskToCatchUp()) {
         return false;
       }
-      long startIndex = peer.getMatchIndex() + 1;
+      long startIndex = peerInfo.getMatchIndex() + 1;
       long endIndex = raftMember.getLogManager().getCommitLogIndex();
       List<Log> logsInDisk = getLogsInStableEntryManager(startIndex, endIndex);
       if (!logsInDisk.isEmpty()) {
@@ -162,7 +162,7 @@ public class CatchUpTask implements Runnable {
     }
     logger.info("{}: {} matches at {}", name, node, newMatchedIndex);
 
-    peer.setMatchIndex(newMatchedIndex);
+    peerInfo.setMatchIndex(newMatchedIndex);
     // if follower return RESPONSE.AGREE with this empty log, then start 
sending real logs from
     // index.
     logs.subList(0, index).clear();
@@ -322,7 +322,7 @@ public class CatchUpTask implements Runnable {
     } catch (IOException e) {
       logger.error("Unexpected error when taking snapshot.", e);
     }
-    snapshot = raftMember.getLogManager().getSnapshot(peer.getMatchIndex());
+    snapshot = 
raftMember.getLogManager().getSnapshot(peerInfo.getMatchIndex());
     if (logger.isInfoEnabled()) {
       logger.info("{}: Logs in {} are too old, catch up with snapshot", 
raftMember.getName(), node);
     }
@@ -351,7 +351,7 @@ public class CatchUpTask implements Runnable {
     try {
       boolean findMatchedIndex = checkMatchIndex();
       if (abort) {
-        peer.resetInconsistentHeartbeatNum();
+        peerInfo.resetInconsistentHeartbeatNum();
         raftMember.getLastCatchUpResponseTime().remove(node);
         return;
       }
@@ -377,17 +377,17 @@ public class CatchUpTask implements Runnable {
               !logs.isEmpty()
                   ? logs.get(logs.size() - 1).getCurrLogIndex()
                   : snapshot.getLastLogIndex();
-          peer.setMatchIndex(lastIndex);
+          peerInfo.setMatchIndex(lastIndex);
         }
         if (logger.isInfoEnabled()) {
           logger.info(
               "{}: Catch up {} finished, update it's matchIndex to {}, time 
consumption: {}ms",
               raftMember.getName(),
               node,
-              peer.getMatchIndex(),
+              peerInfo.getMatchIndex(),
               System.currentTimeMillis() - startTime);
         }
-        peer.resetInconsistentHeartbeatNum();
+        peerInfo.resetInconsistentHeartbeatNum();
       }
 
     } catch (LeaderUnknownException e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 1d5dcabb83..577125463d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.server.handlers.caller;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -100,21 +100,21 @@ public class HeartbeatHandler implements 
AsyncMethodCallback<HeartBeatResponse>
           localLastLogTerm);
     }
 
-    Peer peer = localMember.getPeer(follower);
+    PeerInfo peerInfo = localMember.getPeer(follower);
     if (!localMember.getLogManager().isLogUpToDate(lastLogTerm, lastLogIdx)
         || !localMember.getLogManager().matchTerm(lastLogTerm, lastLogIdx)) {
       // the follower is not up-to-date
-      if (lastLogIdx == -1 || lastLogIdx < peer.getMatchIndex()) {
+      if (lastLogIdx == -1 || lastLogIdx < peerInfo.getMatchIndex()) {
         // maybe the follower has restarted, so we need to find its match 
index again, because
         // some logs may be lost due to restart
-        peer.setMatchIndex(lastLogIdx);
+        peerInfo.setMatchIndex(lastLogIdx);
       }
 
       // only start a catch up when the follower's lastLogIndex remains stall 
and unchanged for 5
       // heartbeats. If the follower is installing snapshot currently, we 
reset the counter.
-      if (lastLogIdx == peer.getLastHeartBeatIndex() && 
!resp.isInstallingSnapshot()) {
+      if (lastLogIdx == peerInfo.getLastHeartBeatIndex() && 
!resp.isInstallingSnapshot()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent 
counter
-        int inconsistentNum = peer.incInconsistentHeartbeatNum();
+        int inconsistentNum = peerInfo.incInconsistentHeartbeatNum();
         if (inconsistentNum >= 1000) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match 
index {}",
@@ -124,20 +124,20 @@ public class HeartbeatHandler implements 
AsyncMethodCallback<HeartBeatResponse>
               lastLogTerm,
               localLastLogIdx,
               localLastLogTerm,
-              peer.getMatchIndex());
+              peerInfo.getMatchIndex());
           localMember.catchUp(follower, lastLogIdx);
         }
       } else {
         // the follower's lastLogIndex is changed, which means the follower is 
not down yet, we
         // reset the counter to see if it can eventually catch up by itself
-        peer.resetInconsistentHeartbeatNum();
+        peerInfo.resetInconsistentHeartbeatNum();
       }
     } else {
       // the follower is up-to-date
-      peer.setMatchIndex(Math.max(peer.getMatchIndex(), lastLogIdx));
-      peer.resetInconsistentHeartbeatNum();
+      peerInfo.setMatchIndex(Math.max(peerInfo.getMatchIndex(), lastLogIdx));
+      peerInfo.resetInconsistentHeartbeatNum();
     }
-    peer.setLastHeartBeatIndex(lastLogIdx);
+    peerInfo.setLastHeartBeatIndex(lastLogIdx);
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 5c3f817be0..b8e593eef1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -67,7 +67,7 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
 import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.IOUtils;
@@ -373,7 +373,7 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
       }
       if (insertIndex > 0) {
         allNodes.add(insertIndex, node);
-        peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
+        peerMap.putIfAbsent(node, new PeerInfo(logManager.getLastLogIndex()));
         // if the local node is the last node and the insertion succeeds, this 
node should leave
         // the group
         logger.debug("{}: Node {} is inserted into the data group {}", name, 
node, allNodes);
@@ -933,7 +933,7 @@ public class DataGroupMember extends RaftMember implements 
DataGroupMemberMBean
         }
         Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
         allNodes.add(newNodeToGroup);
-        peerMap.putIfAbsent(newNodeToGroup, new 
Peer(logManager.getLastLogIndex()));
+        peerMap.putIfAbsent(newNodeToGroup, new 
PeerInfo(logManager.getLastLogIndex()));
       }
     }
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index b108890a7f..ffdbeb5dc7 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -73,7 +73,7 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -143,8 +143,6 @@ public abstract class RaftMember implements RaftMemberMBean 
{
 
   private static final Logger logger = 
LoggerFactory.getLogger(RaftMember.class);
   public static boolean USE_LOG_DISPATCHER = true;
-  private static final boolean USE_INDIRECT_LOG_DISPATCHER =
-      ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting();
   private static final boolean ENABLE_WEAK_ACCEPTANCE =
       ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance();
   public static boolean USE_CRAFT = false;
@@ -189,7 +187,7 @@ public abstract class RaftMember implements RaftMemberMBean 
{
   /** to choose nodes to send request of joining cluster randomly. */
   Random random = new Random();
   /** when the node is a leader, this map is used to track log progress of 
each follower. */
-  Map<Node, Peer> peerMap;
+  Map<Node, PeerInfo> peerMap;
   /**
    * the current term of the node, this object also works as lock of some 
transactions of the member
    * like elections.
@@ -1373,12 +1371,12 @@ public abstract class RaftMember implements 
RaftMemberMBean {
   public void initPeerMap() {
     peerMap = new ConcurrentHashMap<>();
     for (Node entry : allNodes) {
-      peerMap.computeIfAbsent(entry, k -> new 
Peer(logManager.getLastLogIndex()));
+      peerMap.computeIfAbsent(entry, k -> new 
PeerInfo(logManager.getLastLogIndex()));
     }
   }
 
-  public Peer getPeer(Node node) {
-    return peerMap.computeIfAbsent(node, r -> new 
Peer(getLogManager().getLastLogIndex()));
+  public PeerInfo getPeer(Node node) {
+    return peerMap.computeIfAbsent(node, r -> new 
PeerInfo(getLogManager().getLastLogIndex()));
   }
 
   /** @return true if there is a log whose index is "index" and term is 
"term", false otherwise */
@@ -2143,8 +2141,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
      * too many waiting requests on the peer's side.
      */
     long startTime = 
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
-    Peer peer = peerMap.computeIfAbsent(node, k -> new 
Peer(logManager.getLastLogIndex()));
-    if (!waitForPrevLog(peer, log.getLog())) {
+    PeerInfo peerInfo = peerMap.computeIfAbsent(node, k -> new 
PeerInfo(logManager.getLastLogIndex()));
+    if (!waitForPrevLog(peerInfo, log.getLog())) {
       logger.warn("{}: node {} timed out when appending {}", name, node, log);
       return;
     }
@@ -2166,18 +2164,18 @@ public abstract class RaftMember implements 
RaftMemberMBean {
    * no bigger than maxLogDiff.
    */
   @SuppressWarnings("java:S2445") // safe synchronized
-  public boolean waitForPrevLog(Peer peer, Log log) {
+  public boolean waitForPrevLog(PeerInfo peerInfo, Log log) {
     final int maxLogDiff = config.getMaxNumOfLogsInMem();
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
     // if the peer falls behind too much, wait until it catches up, otherwise 
there may be too
     // many client threads in the peer
-    while (peer.getMatchIndex() < log.getCurrLogIndex() - maxLogDiff
+    while (peerInfo.getMatchIndex() < log.getCurrLogIndex() - maxLogDiff
         && character == NodeCharacter.LEADER
         && alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
-      synchronized (peer) {
+      synchronized (peerInfo) {
         try {
-          peer.wait(ClusterConstant.getWriteOperationTimeoutMS());
+          peerInfo.wait(ClusterConstant.getWriteOperationTimeoutMS());
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("Waiting for peer to catch up interrupted");
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/PeerInfo.java
similarity index 97%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/PeerInfo.java
index d012d60ebc..777d85552e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/PeerInfo.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class Peer {
+public class PeerInfo {
 
   private long nextIndex;
   private long matchIndex;
@@ -29,7 +29,7 @@ public class Peer {
   // lastLogIndex from the last heartbeat
   private long lastHeartBeatIndex;
 
-  public Peer(long nextIndex) {
+  public PeerInfo(long nextIndex) {
     this.nextIndex = nextIndex;
     this.matchIndex = -1;
   }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index 1e321acd91..3c4c812862 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -42,7 +42,7 @@ import 
org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 
@@ -221,9 +221,9 @@ public class CatchUpTaskTest {
     
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    Peer peer = new Peer(10);
-    peer.setMatchIndex(9);
-    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 9);
+    PeerInfo peerInfo = new PeerInfo(10);
+    peerInfo.setMatchIndex(9);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 9);
     task.run();
 
     assertTrue(receivedLogs.isEmpty());
@@ -246,9 +246,9 @@ public class CatchUpTaskTest {
     
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    Peer peer = new Peer(10);
-    peer.setMatchIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
+    PeerInfo peerInfo = new PeerInfo(10);
+    peerInfo.setMatchIndex(0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 5);
     task.run();
 
     assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -278,9 +278,9 @@ public class CatchUpTaskTest {
           
.setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
       Node receiver = new Node();
       sender.setCharacter(NodeCharacter.LEADER);
-      Peer peer = new Peer(10);
-      peer.setMatchIndex(0);
-      CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
+      PeerInfo peerInfo = new PeerInfo(10);
+      peerInfo.setMatchIndex(0);
+      CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 5);
       task.run();
 
       assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -304,9 +304,9 @@ public class CatchUpTaskTest {
     
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    Peer peer = new Peer(10);
-    peer.setNextIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
+    PeerInfo peerInfo = new PeerInfo(10);
+    peerInfo.setNextIndex(0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 0);
     ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
     task.run();
 
@@ -328,9 +328,9 @@ public class CatchUpTaskTest {
     
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    Peer peer = new Peer(10);
-    peer.setNextIndex(0);
-    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
+    PeerInfo peerInfo = new PeerInfo(10);
+    peerInfo.setNextIndex(0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 0);
     task.run();
 
     assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -356,11 +356,11 @@ public class CatchUpTaskTest {
     
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
     Node receiver = new Node();
     sender.setCharacter(NodeCharacter.LEADER);
-    Peer peer = new Peer(10);
-    peer.setMatchIndex(0);
-    peer.setNextIndex(0);
+    PeerInfo peerInfo = new PeerInfo(10);
+    peerInfo.setMatchIndex(0);
+    peerInfo.setNextIndex(0);
 
-    CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
+    CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 0);
     task.setLogs(logList);
     try {
       // 1. case 1: the matched index is in the middle of the logs interval
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 3c82a4241a..f69f144054 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 
@@ -71,7 +71,7 @@ public class AppendNodeEntryHandlerTest {
       ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
       VotingLog votingLog = new VotingLog(log, 10);
       member.getVotingLogList().insert(votingLog);
-      Peer peer = new Peer(1);
+      PeerInfo peerInfo = new PeerInfo(1);
       for (int i = 0; i < 10; i++) {
         AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
         handler.setLeaderShipStale(leadershipStale);
@@ -105,7 +105,7 @@ public class AppendNodeEntryHandlerTest {
     Log log = new TestLog();
     VotingLog votingLog = new VotingLog(log, 10);
     member.getVotingLogList().insert(votingLog);
-    Peer peer = new Peer(1);
+    PeerInfo peerInfo = new PeerInfo(1);
 
     for (int i = 0; i < 3; i++) {
       AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -131,7 +131,7 @@ public class AppendNodeEntryHandlerTest {
     AtomicBoolean leadershipStale = new AtomicBoolean(false);
     Log log = new TestLog();
     VotingLog votingLog = new VotingLog(log, 10);
-    Peer peer = new Peer(1);
+    PeerInfo peerInfo = new PeerInfo(1);
 
     synchronized (votingLog) {
       AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -158,7 +158,7 @@ public class AppendNodeEntryHandlerTest {
     ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
     try {
       VotingLog votingLog = new VotingLog(log, 10);
-      Peer peer = new Peer(1);
+      PeerInfo peerInfo = new PeerInfo(1);
 
       AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
       handler.setLeaderShipStale(leadershipStale);
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ISingleConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ISingleConsensus.java
new file mode 100644
index 0000000000..60fa3be95b
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ISingleConsensus.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.consensus;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+
+public interface ISingleConsensus {
+
+  void start() throws IOException;
+
+  void stop() throws IOException;
+
+  // write API
+  ConsensusWriteResponse write(IConsensusRequest request);
+  // read API
+  ConsensusReadResponse read(IConsensusRequest request);
+
+  // single consensus group API
+  ConsensusGenericResponse addPeer(Peer peer);
+
+  ConsensusGenericResponse removePeer(Peer peer);
+
+  ConsensusGenericResponse changePeer(List<Peer> newPeers);
+
+  // management API
+  ConsensusGenericResponse transferLeader(Peer newLeader);
+
+  ConsensusGenericResponse triggerSnapshot();
+
+  boolean isLeader();
+
+  Peer getLeader();
+}

Reply via email to