This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_catch_up by this push:
new 8ec7aee64d rename peer to peerinfo
8ec7aee64d is described below
commit 8ec7aee64dd886a0eb168436fc9d0a1299cd5864
Author: jt <[email protected]>
AuthorDate: Mon May 9 10:40:39 2022 +0800
rename peer to peerinfo
---
.../cluster/impl/NativeSingleRaftConsensus.java | 90 ++++++++++++++++++++++
.../apache/iotdb/cluster/log/LogDispatcher.java | 8 +-
.../iotdb/cluster/log/catchup/CatchUpTask.java | 24 +++---
.../server/handlers/caller/HeartbeatHandler.java | 22 +++---
.../cluster/server/member/DataGroupMember.java | 6 +-
.../iotdb/cluster/server/member/RaftMember.java | 24 +++---
.../server/monitor/{Peer.java => PeerInfo.java} | 4 +-
.../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 40 +++++-----
.../caller/AppendNodeEntryHandlerTest.java | 10 +--
.../apache/iotdb/consensus/ISingleConsensus.java | 58 ++++++++++++++
10 files changed, 216 insertions(+), 70 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
new file mode 100644
index 0000000000..b5332032c9
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.impl;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.consensus.ISingleConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+
+public class NativeSingleRaftConsensus implements ISingleConsensus {
+
+ private RaftMember raftMember;
+
+ @Override
+ public void start() throws IOException {
+ raftMember.start();
+ }
+
+ @Override
+ public void stop() throws IOException {
+ raftMember.stop();
+ }
+
+ @Override
+ public ConsensusWriteResponse write(IConsensusRequest request) {
+ return null;
+ }
+
+ @Override
+ public ConsensusReadResponse read(IConsensusRequest request) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse addPeer(Peer peer) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse removePeer(Peer peer) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse changePeer(List<Peer> newPeers) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse transferLeader(Peer newLeader) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse triggerSnapshot() {
+ return null;
+ }
+
+ @Override
+ public boolean isLeader() {
+ return false;
+ }
+
+ @Override
+ public Peer getLeader() {
+ return null;
+ }
+}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 4bd540a91c..9377782127 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -32,7 +32,7 @@ import
org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -283,7 +283,7 @@ public class LogDispatcher {
Node receiver;
private BlockingQueue<SendLogRequest> logBlockingDeque;
protected List<SendLogRequest> currBatch = new ArrayList<>();
- private Peer peer;
+ private PeerInfo peerInfo;
Client syncClient;
AsyncClient asyncClient;
private String baseName;
@@ -291,7 +291,7 @@ public class LogDispatcher {
DispatcherThread(Node receiver, BlockingQueue<SendLogRequest>
logBlockingDeque) {
this.receiver = receiver;
this.logBlockingDeque = logBlockingDeque;
- this.peer = member.getPeer(receiver);
+ this.peerInfo = member.getPeer(receiver);
if (!clusterConfig.isUseAsyncServer()) {
syncClient = member.getSyncClient(receiver);
}
@@ -356,7 +356,7 @@ public class LogDispatcher {
List<ByteBuffer> logList, AppendEntriesRequest request,
List<SendLogRequest> currBatch) {
long startTime =
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- if (!member.waitForPrevLog(peer,
currBatch.get(0).getVotingLog().getLog())) {
+ if (!member.waitForPrevLog(peerInfo,
currBatch.get(0).getVotingLog().getLog())) {
logger.warn(
"{}: node {} timed out when appending {}",
member.getName(),
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index cee7b524e0..2e7e6f395b 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -50,7 +50,7 @@ public class CatchUpTask implements Runnable {
private static final Logger logger =
LoggerFactory.getLogger(CatchUpTask.class);
private Node node;
- private Peer peer;
+ private PeerInfo peerInfo;
private RaftMember raftMember;
private Snapshot snapshot;
private List<Log> logs;
@@ -61,10 +61,10 @@ public class CatchUpTask implements Runnable {
private long startTime;
- public CatchUpTask(Node node, int raftId, Peer peer, RaftMember raftMember,
long lastLogIdx) {
+ public CatchUpTask(Node node, int raftId, PeerInfo peerInfo, RaftMember
raftMember, long lastLogIdx) {
this.node = node;
this.raftId = raftId;
- this.peer = peer;
+ this.peerInfo = peerInfo;
this.raftMember = raftMember;
this.logs = Collections.emptyList();
this.snapshot = null;
@@ -88,7 +88,7 @@ public class CatchUpTask implements Runnable {
try {
// to avoid snapshot catch up when index is volatile
localFirstIndex = raftMember.getLogManager().getFirstIndex();
- lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
+ lo = Math.max(localFirstIndex, peerInfo.getMatchIndex() + 1);
hi = raftMember.getLogManager().getLastLogIndex() + 1;
logs = raftMember.getLogManager().getEntries(lo, hi);
@@ -121,7 +121,7 @@ public class CatchUpTask implements Runnable {
if (!judgeUseLogsInDiskToCatchUp()) {
return false;
}
- long startIndex = peer.getMatchIndex() + 1;
+ long startIndex = peerInfo.getMatchIndex() + 1;
long endIndex = raftMember.getLogManager().getCommitLogIndex();
List<Log> logsInDisk = getLogsInStableEntryManager(startIndex, endIndex);
if (!logsInDisk.isEmpty()) {
@@ -162,7 +162,7 @@ public class CatchUpTask implements Runnable {
}
logger.info("{}: {} matches at {}", name, node, newMatchedIndex);
- peer.setMatchIndex(newMatchedIndex);
+ peerInfo.setMatchIndex(newMatchedIndex);
// if follower return RESPONSE.AGREE with this empty log, then start
sending real logs from
// index.
logs.subList(0, index).clear();
@@ -322,7 +322,7 @@ public class CatchUpTask implements Runnable {
} catch (IOException e) {
logger.error("Unexpected error when taking snapshot.", e);
}
- snapshot = raftMember.getLogManager().getSnapshot(peer.getMatchIndex());
+ snapshot =
raftMember.getLogManager().getSnapshot(peerInfo.getMatchIndex());
if (logger.isInfoEnabled()) {
logger.info("{}: Logs in {} are too old, catch up with snapshot",
raftMember.getName(), node);
}
@@ -351,7 +351,7 @@ public class CatchUpTask implements Runnable {
try {
boolean findMatchedIndex = checkMatchIndex();
if (abort) {
- peer.resetInconsistentHeartbeatNum();
+ peerInfo.resetInconsistentHeartbeatNum();
raftMember.getLastCatchUpResponseTime().remove(node);
return;
}
@@ -377,17 +377,17 @@ public class CatchUpTask implements Runnable {
!logs.isEmpty()
? logs.get(logs.size() - 1).getCurrLogIndex()
: snapshot.getLastLogIndex();
- peer.setMatchIndex(lastIndex);
+ peerInfo.setMatchIndex(lastIndex);
}
if (logger.isInfoEnabled()) {
logger.info(
"{}: Catch up {} finished, update it's matchIndex to {}, time
consumption: {}ms",
raftMember.getName(),
node,
- peer.getMatchIndex(),
+ peerInfo.getMatchIndex(),
System.currentTimeMillis() - startTime);
}
- peer.resetInconsistentHeartbeatNum();
+ peerInfo.resetInconsistentHeartbeatNum();
}
} catch (LeaderUnknownException e) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 1d5dcabb83..577125463d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.server.handlers.caller;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -100,21 +100,21 @@ public class HeartbeatHandler implements
AsyncMethodCallback<HeartBeatResponse>
localLastLogTerm);
}
- Peer peer = localMember.getPeer(follower);
+ PeerInfo peerInfo = localMember.getPeer(follower);
if (!localMember.getLogManager().isLogUpToDate(lastLogTerm, lastLogIdx)
|| !localMember.getLogManager().matchTerm(lastLogTerm, lastLogIdx)) {
// the follower is not up-to-date
- if (lastLogIdx == -1 || lastLogIdx < peer.getMatchIndex()) {
+ if (lastLogIdx == -1 || lastLogIdx < peerInfo.getMatchIndex()) {
// maybe the follower has restarted, so we need to find its match
index again, because
// some logs may be lost due to restart
- peer.setMatchIndex(lastLogIdx);
+ peerInfo.setMatchIndex(lastLogIdx);
}
// only start a catch up when the follower's lastLogIndex remains stall
and unchanged for 5
// heartbeats. If the follower is installing snapshot currently, we
reset the counter.
- if (lastLogIdx == peer.getLastHeartBeatIndex() &&
!resp.isInstallingSnapshot()) {
+ if (lastLogIdx == peerInfo.getLastHeartBeatIndex() &&
!resp.isInstallingSnapshot()) {
// the follower's lastLogIndex is unchanged, increase inconsistent
counter
- int inconsistentNum = peer.incInconsistentHeartbeatNum();
+ int inconsistentNum = peerInfo.incInconsistentHeartbeatNum();
if (inconsistentNum >= 1000) {
logger.info(
"{}: catching up node {}, index-term: {}-{}/{}-{}, peer match
index {}",
@@ -124,20 +124,20 @@ public class HeartbeatHandler implements
AsyncMethodCallback<HeartBeatResponse>
lastLogTerm,
localLastLogIdx,
localLastLogTerm,
- peer.getMatchIndex());
+ peerInfo.getMatchIndex());
localMember.catchUp(follower, lastLogIdx);
}
} else {
// the follower's lastLogIndex is changed, which means the follower is
not down yet, we
// reset the counter to see if it can eventually catch up by itself
- peer.resetInconsistentHeartbeatNum();
+ peerInfo.resetInconsistentHeartbeatNum();
}
} else {
// the follower is up-to-date
- peer.setMatchIndex(Math.max(peer.getMatchIndex(), lastLogIdx));
- peer.resetInconsistentHeartbeatNum();
+ peerInfo.setMatchIndex(Math.max(peerInfo.getMatchIndex(), lastLogIdx));
+ peerInfo.resetInconsistentHeartbeatNum();
}
- peer.setLastHeartBeatIndex(lastLogIdx);
+ peerInfo.setLastHeartBeatIndex(lastLogIdx);
}
@Override
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 5c3f817be0..b8e593eef1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -67,7 +67,7 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.IOUtils;
@@ -373,7 +373,7 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
}
if (insertIndex > 0) {
allNodes.add(insertIndex, node);
- peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
+ peerMap.putIfAbsent(node, new PeerInfo(logManager.getLastLogIndex()));
// if the local node is the last node and the insertion succeeds, this
node should leave
// the group
logger.debug("{}: Node {} is inserted into the data group {}", name,
node, allNodes);
@@ -933,7 +933,7 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
}
Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
allNodes.add(newNodeToGroup);
- peerMap.putIfAbsent(newNodeToGroup, new
Peer(logManager.getLastLogIndex()));
+ peerMap.putIfAbsent(newNodeToGroup, new
PeerInfo(logManager.getLastLogIndex()));
}
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index b108890a7f..ffdbeb5dc7 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -73,7 +73,7 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -143,8 +143,6 @@ public abstract class RaftMember implements RaftMemberMBean
{
private static final Logger logger =
LoggerFactory.getLogger(RaftMember.class);
public static boolean USE_LOG_DISPATCHER = true;
- private static final boolean USE_INDIRECT_LOG_DISPATCHER =
- ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting();
private static final boolean ENABLE_WEAK_ACCEPTANCE =
ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance();
public static boolean USE_CRAFT = false;
@@ -189,7 +187,7 @@ public abstract class RaftMember implements RaftMemberMBean
{
/** to choose nodes to send request of joining cluster randomly. */
Random random = new Random();
/** when the node is a leader, this map is used to track log progress of
each follower. */
- Map<Node, Peer> peerMap;
+ Map<Node, PeerInfo> peerMap;
/**
* the current term of the node, this object also works as lock of some
transactions of the member
* like elections.
@@ -1373,12 +1371,12 @@ public abstract class RaftMember implements
RaftMemberMBean {
public void initPeerMap() {
peerMap = new ConcurrentHashMap<>();
for (Node entry : allNodes) {
- peerMap.computeIfAbsent(entry, k -> new
Peer(logManager.getLastLogIndex()));
+ peerMap.computeIfAbsent(entry, k -> new
PeerInfo(logManager.getLastLogIndex()));
}
}
- public Peer getPeer(Node node) {
- return peerMap.computeIfAbsent(node, r -> new
Peer(getLogManager().getLastLogIndex()));
+ public PeerInfo getPeer(Node node) {
+ return peerMap.computeIfAbsent(node, r -> new
PeerInfo(getLogManager().getLastLogIndex()));
}
/** @return true if there is a log whose index is "index" and term is
"term", false otherwise */
@@ -2143,8 +2141,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
* too many waiting requests on the peer's side.
*/
long startTime =
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- Peer peer = peerMap.computeIfAbsent(node, k -> new
Peer(logManager.getLastLogIndex()));
- if (!waitForPrevLog(peer, log.getLog())) {
+ PeerInfo peerInfo = peerMap.computeIfAbsent(node, k -> new
PeerInfo(logManager.getLastLogIndex()));
+ if (!waitForPrevLog(peerInfo, log.getLog())) {
logger.warn("{}: node {} timed out when appending {}", name, node, log);
return;
}
@@ -2166,18 +2164,18 @@ public abstract class RaftMember implements
RaftMemberMBean {
* no bigger than maxLogDiff.
*/
@SuppressWarnings("java:S2445") // safe synchronized
- public boolean waitForPrevLog(Peer peer, Log log) {
+ public boolean waitForPrevLog(PeerInfo peerInfo, Log log) {
final int maxLogDiff = config.getMaxNumOfLogsInMem();
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
// if the peer falls behind too much, wait until it catches up, otherwise
there may be too
// many client threads in the peer
- while (peer.getMatchIndex() < log.getCurrLogIndex() - maxLogDiff
+ while (peerInfo.getMatchIndex() < log.getCurrLogIndex() - maxLogDiff
&& character == NodeCharacter.LEADER
&& alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
- synchronized (peer) {
+ synchronized (peerInfo) {
try {
- peer.wait(ClusterConstant.getWriteOperationTimeoutMS());
+ peerInfo.wait(ClusterConstant.getWriteOperationTimeoutMS());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Waiting for peer to catch up interrupted");
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/PeerInfo.java
similarity index 97%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/PeerInfo.java
index d012d60ebc..777d85552e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/PeerInfo.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.cluster.server.monitor;
import java.util.concurrent.atomic.AtomicInteger;
-public class Peer {
+public class PeerInfo {
private long nextIndex;
private long matchIndex;
@@ -29,7 +29,7 @@ public class Peer {
// lastLogIndex from the last heartbeat
private long lastHeartBeatIndex;
- public Peer(long nextIndex) {
+ public PeerInfo(long nextIndex) {
this.nextIndex = nextIndex;
this.matchIndex = -1;
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index 1e321acd91..3c4c812862 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -42,7 +42,7 @@ import
org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -221,9 +221,9 @@ public class CatchUpTaskTest {
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- Peer peer = new Peer(10);
- peer.setMatchIndex(9);
- CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 9);
+ PeerInfo peerInfo = new PeerInfo(10);
+ peerInfo.setMatchIndex(9);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 9);
task.run();
assertTrue(receivedLogs.isEmpty());
@@ -246,9 +246,9 @@ public class CatchUpTaskTest {
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- Peer peer = new Peer(10);
- peer.setMatchIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
+ PeerInfo peerInfo = new PeerInfo(10);
+ peerInfo.setMatchIndex(0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 5);
task.run();
assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -278,9 +278,9 @@ public class CatchUpTaskTest {
.setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- Peer peer = new Peer(10);
- peer.setMatchIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
+ PeerInfo peerInfo = new PeerInfo(10);
+ peerInfo.setMatchIndex(0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 5);
task.run();
assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -304,9 +304,9 @@ public class CatchUpTaskTest {
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- Peer peer = new Peer(10);
- peer.setNextIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
+ PeerInfo peerInfo = new PeerInfo(10);
+ peerInfo.setNextIndex(0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 0);
ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
task.run();
@@ -328,9 +328,9 @@ public class CatchUpTaskTest {
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- Peer peer = new Peer(10);
- peer.setNextIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
+ PeerInfo peerInfo = new PeerInfo(10);
+ peerInfo.setNextIndex(0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 0);
task.run();
assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -356,11 +356,11 @@ public class CatchUpTaskTest {
sender.getLogManager().setMaxHaveAppliedCommitIndex(sender.getLogManager().getCommitLogIndex());
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- Peer peer = new Peer(10);
- peer.setMatchIndex(0);
- peer.setNextIndex(0);
+ PeerInfo peerInfo = new PeerInfo(10);
+ peerInfo.setMatchIndex(0);
+ peerInfo.setNextIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peerInfo, sender, 0);
task.setLogs(logList);
try {
// 1. case 1: the matched index is in the middle of the logs interval
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 3c82a4241a..f69f144054 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -71,7 +71,7 @@ public class AppendNodeEntryHandlerTest {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
VotingLog votingLog = new VotingLog(log, 10);
member.getVotingLogList().insert(votingLog);
- Peer peer = new Peer(1);
+ PeerInfo peerInfo = new PeerInfo(1);
for (int i = 0; i < 10; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
@@ -105,7 +105,7 @@ public class AppendNodeEntryHandlerTest {
Log log = new TestLog();
VotingLog votingLog = new VotingLog(log, 10);
member.getVotingLogList().insert(votingLog);
- Peer peer = new Peer(1);
+ PeerInfo peerInfo = new PeerInfo(1);
for (int i = 0; i < 3; i++) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -131,7 +131,7 @@ public class AppendNodeEntryHandlerTest {
AtomicBoolean leadershipStale = new AtomicBoolean(false);
Log log = new TestLog();
VotingLog votingLog = new VotingLog(log, 10);
- Peer peer = new Peer(1);
+ PeerInfo peerInfo = new PeerInfo(1);
synchronized (votingLog) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
@@ -158,7 +158,7 @@ public class AppendNodeEntryHandlerTest {
ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
try {
VotingLog votingLog = new VotingLog(log, 10);
- Peer peer = new Peer(1);
+ PeerInfo peerInfo = new PeerInfo(1);
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setLeaderShipStale(leadershipStale);
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ISingleConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ISingleConsensus.java
new file mode 100644
index 0000000000..60fa3be95b
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ISingleConsensus.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.consensus;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+
+public interface ISingleConsensus {
+
+ void start() throws IOException;
+
+ void stop() throws IOException;
+
+ // write API
+ ConsensusWriteResponse write(IConsensusRequest request);
+ // read API
+ ConsensusReadResponse read(IConsensusRequest request);
+
+ // single consensus group API
+ ConsensusGenericResponse addPeer(Peer peer);
+
+ ConsensusGenericResponse removePeer(Peer peer);
+
+ ConsensusGenericResponse changePeer(List<Peer> newPeers);
+
+ // management API
+ ConsensusGenericResponse transferLeader(Peer newLeader);
+
+ ConsensusGenericResponse triggerSnapshot();
+
+ boolean isLeader();
+
+ Peer getLeader();
+}