This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3e3e61c RATIS-1032. Change RaftPeer and proto to support priority
(#181)
3e3e61c is described below
commit 3e3e61cf4552b393d332ff78162aca3e1bd3222b
Author: runzhiwang <[email protected]>
AuthorDate: Wed Aug 26 01:13:05 2020 +0800
RATIS-1032. Change RaftPeer and proto to support priority (#181)
---
.../java/org/apache/ratis/protocol/RaftPeer.java | 20 +++++++--
.../java/org/apache/ratis/util/ProtoUtils.java | 2 +-
ratis-proto/src/main/proto/Raft.proto | 1 +
.../java/org/apache/ratis/MiniRaftCluster.java | 8 ++++
.../server/impl/RaftReconfigurationBaseTest.java | 47 ++++++++++++++++++++++
5 files changed, 74 insertions(+), 4 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index e721f15..d46ef4a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -42,23 +42,31 @@ public class RaftPeer {
private final RaftPeerId id;
/** The address of the peer. */
private final String address;
+ /** The priority of the peer. */
+ private final int priority;
private final Supplier<RaftPeerProto> raftPeerProto;
/** Construct a peer with the given id and a null address. */
public RaftPeer(RaftPeerId id) {
- this(id, (String)null);
+ this(id, (String)null, 0);
}
/** Construct a peer with the given id and address. */
public RaftPeer(RaftPeerId id, InetSocketAddress address) {
- this(id, address == null ? null : NetUtils.address2String(address));
+ this(id, address == null ? null : NetUtils.address2String(address), 0);
}
/** Construct a peer with the given id and address. */
public RaftPeer(RaftPeerId id, String address) {
+ this(id, address, 0);
+ }
+
+ /** Construct a peer with the given id, address, priority. */
+ public RaftPeer(RaftPeerId id, String address, int priority) {
this.id = Objects.requireNonNull(id, "id == null");
this.address = address;
+ this.priority = priority;
this.raftPeerProto = JavaUtils.memoize(this::buildRaftPeerProto);
}
@@ -68,6 +76,7 @@ public class RaftPeer {
if (getAddress() != null) {
builder.setAddress(getAddress());
}
+ builder.setPriority(priority);
return builder.build();
}
@@ -81,13 +90,18 @@ public class RaftPeer {
return address;
}
+ /** @return The priority of the peer. */
+ public int getPriority() {
+ return priority;
+ }
+
public RaftPeerProto getRaftPeerProto() {
return raftPeerProto.get();
}
@Override
public String toString() {
- return id + ":" + address;
+ return id + ":" + address + ":" + priority;
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 4ba87b9..97a8676 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -72,7 +72,7 @@ public interface ProtoUtils {
}
static RaftPeer toRaftPeer(RaftPeerProto p) {
- return new RaftPeer(RaftPeerId.valueOf(p.getId()), p.getAddress());
+ return new RaftPeer(RaftPeerId.valueOf(p.getId()), p.getAddress(),
p.getPriority());
}
static List<RaftPeer> toRaftPeers(List<RaftPeerProto> protos) {
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 460ffc5..4b82228 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -24,6 +24,7 @@ package ratis.common;
message RaftPeerProto {
bytes id = 1; // id of the peer
string address = 2; // e.g. IP address, hostname etc.
+ uint32 priority = 3; // priority of the peer
}
message RaftGroupIdProto {
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index d9c57d1..2b7a19f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -624,10 +624,18 @@ public abstract class MiniRaftCluster implements
Closeable {
return getRaftServerImpl(servers.get(id));
}
+ public RaftServerImpl getRaftServerImpl(RaftPeerId id, RaftGroupId groupId) {
+ return getRaftServerImpl(servers.get(id), groupId);
+ }
+
public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
}
+ public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId
groupId) {
+ return RaftServerTestUtil.getRaftServerImpl(proxy, groupId);
+ }
+
public List<RaftPeer> getPeers() {
return toRaftPeers(getServers());
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index ca78032..3ee0a46 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.List;
import static java.util.Arrays.asList;
import static
org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
@@ -69,6 +70,52 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
RaftServerConfigKeys.setStagingCatchupGap(getProperties(),
STAGING_CATCHUP_GAP);
}
+ private void checkPriority(CLUSTER cluster, RaftGroupId groupId,
List<RaftPeer> peersWithPriority)
+ throws InterruptedException {
+ RaftTestUtil.waitForLeader(cluster, groupId);
+
+ for (int i = 0; i < peersWithPriority.size(); i ++) {
+ RaftPeerId peerId = peersWithPriority.get(i).getId();
+ RaftServerImpl server = cluster.getRaftServerImpl(peerId, groupId);
+ RaftConfiguration conf = server.getState().getRaftConf();
+
+ for (int j = 0; j < peersWithPriority.size(); j ++) {
+ int priorityInConf =
conf.getPeer(peersWithPriority.get(j).getId()).getPriority();
+ Assert.assertEquals(priorityInConf,
peersWithPriority.get(j).getPriority());
+ }
+ }
+ }
+
+ @Test
+ public void testRestorePriority() throws Exception {
+ runWithNewCluster(3, cluster -> {
+ // Add groups
+ List<RaftPeer> peers = cluster.getPeers();
+
+ List<RaftPeer> peersWithPriority = new ArrayList<>();
+ for (int i = 0; i < peers.size(); i++) {
+ RaftPeer peer = peers.get(i);
+ peersWithPriority.add(new RaftPeer(peer.getId(), peer.getAddress(),
i));
+ }
+
+ final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(),
peersWithPriority);
+ LOG.info("add new group: " + newGroup);
+ try (final RaftClient client = cluster.createClient(newGroup)) {
+ for (RaftPeer p : newGroup.getPeers()) {
+ client.groupAdd(newGroup, p.getId());
+ }
+ }
+
+ RaftGroupId groupId = newGroup.getGroupId();
+
+ checkPriority(cluster, groupId, peersWithPriority);
+
+ cluster.restart(false);
+
+ checkPriority(cluster, groupId, peersWithPriority);
+ });
+ }
+
/**
* add 2 new peers (3 peers -> 5 peers), no leader change
*/