This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6cd9f97 RATIS-1103. Add an interface for adding RaftPeers. (#227)
6cd9f97 is described below
commit 6cd9f979164f24d9bb73c9e8a71239df71686c92
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 21 09:57:42 2020 +0800
RATIS-1103. Add an interface for adding RaftPeers. (#227)
---
.../java/org/apache/ratis/client/DataStreamClient.java | 3 ---
.../main/java/org/apache/ratis/client/RaftClientRpc.java | 5 +----
.../org/apache/ratis/client/impl/DataStreamClientImpl.java | 5 -----
.../org/apache/ratis/client/impl/GroupManagementImpl.java | 2 +-
.../java/org/apache/ratis/client/impl/RaftClientImpl.java | 12 +++---------
.../apache/ratis/client/impl/RaftClientRpcWithProxy.java | 5 +++--
.../src/main/java/org/apache/ratis/protocol/RaftPeer.java | 14 +++++++++++++-
.../src/main/java/org/apache/ratis/util/PeerProxyMap.java | 6 ++++--
.../org/apache/ratis/grpc/client/GrpcClientStreamer.java | 2 +-
.../apache/ratis/netty/server/NettyServerStreamRpc.java | 4 ++--
.../java/org/apache/ratis/server/DataStreamServerRpc.java | 6 +-----
.../main/java/org/apache/ratis/server/RaftServerRpc.java | 5 +----
.../java/org/apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../java/org/apache/ratis/server/impl/RaftServerProxy.java | 2 +-
.../apache/ratis/server/impl/RaftServerRpcWithProxy.java | 7 ++++---
.../java/org/apache/ratis/server/impl/ServerState.java | 2 +-
.../src/test/java/org/apache/ratis/RetryCacheTests.java | 2 +-
.../apache/ratis/server/simulation/SimulatedClientRpc.java | 4 +++-
.../apache/ratis/server/simulation/SimulatedServerRpc.java | 3 ++-
.../java/org/apache/ratis/datastream/TestDataStream.java | 2 +-
20 files changed, 44 insertions(+), 49 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index b72e1fd..9122763 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -38,9 +38,6 @@ public interface DataStreamClient extends DataStreamApi,
Closeable {
/** Return the rpc client instance **/
DataStreamClientRpc getClientRpc();
- /** add information of the raft peers to communicate with */
- void addPeers(Iterable<RaftPeer> peers);
-
/** start the client */
void start();
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index cf8300e..9034829 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -29,7 +29,7 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/** The client side rpc of a raft service. */
-public interface RaftClientRpc extends Closeable {
+public interface RaftClientRpc extends RaftPeer.Add, Closeable {
/** Async call to send a request. */
default CompletableFuture<RaftClientReply>
sendRequestAsync(RaftClientRequest request) {
throw new UnsupportedOperationException(getClass() + " does not support
this method.");
@@ -44,9 +44,6 @@ public interface RaftClientRpc extends Closeable {
/** Send a request. */
RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
- /** Add the information of the given raft servers */
- void addServers(Iterable<RaftPeer> servers);
-
/**
* Handle the given throwable. For example, try reconnecting.
*
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 9149b74..0bfd662 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -115,11 +115,6 @@ public class DataStreamClientImpl implements
DataStreamClient {
}
@Override
- public void addPeers(Iterable<RaftPeer> peers) {
- return;
- }
-
- @Override
public void close(){
dataStreamClientRpc.closeClient();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index fab45b3..e8566df 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -46,7 +46,7 @@ class GroupManagementImpl implements GroupManagementApi {
Objects.requireNonNull(newGroup, "newGroup == null");
final long callId = RaftClientImpl.nextCallId();
- client.addServers(newGroup.getPeers().stream());
+ client.getClientRpc().addRaftPeers(newGroup.getPeers());
return
client.io().sendRequest(GroupManagementRequest.newAdd(client.getId(), server,
callId, newGroup));
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7105657..4ff170a 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -60,7 +60,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
-import java.util.stream.Stream;
/** A client who sends requests to a raft service. */
public final class RaftClientImpl implements RaftClient {
@@ -133,7 +132,7 @@ public final class RaftClientImpl implements RaftClient {
this.retryPolicy = retryPolicy;
scheduler = TimeoutScheduler.getInstance();
- clientRpc.addServers(peers);
+ clientRpc.addRaftPeers(peers);
this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this,
properties));
this.streamApi = JavaUtils.memoize(() ->
MessageStreamImpl.newInstance(this, properties));
@@ -209,7 +208,7 @@ public final class RaftClientImpl implements RaftClient {
final long callId = nextCallId();
// also refresh the rpc proxies for these peers
- addServers(Arrays.stream(peersInNewConf));
+ clientRpc.addRaftPeers(peersInNewConf);
return io().sendRequestWithRetry(() -> new SetConfigurationRequest(
clientId, leaderId, groupId, callId, Arrays.asList(peersInNewConf)));
}
@@ -229,11 +228,6 @@ public final class RaftClientImpl implements RaftClient {
return blockingApi.get();
}
- void addServers(Stream<RaftPeer> peersInNewConf) {
- clientRpc.addServers(
- peersInNewConf.filter(p -> !peers.contains(p))::iterator);
- }
-
Throwable noMoreRetries(ClientRetryEvent event) {
final int attemptCount = event.getAttemptCount();
final Throwable throwable = event.getCause();
@@ -284,7 +278,7 @@ public final class RaftClientImpl implements RaftClient {
peers.clear();
peers.addAll(newPeers);
// also refresh the rpc proxies for these peers
- clientRpc.addServers(newPeers);
+ clientRpc.addRaftPeers(newPeers);
}
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
index b5fbf48..281ceb3 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.PeerProxyMap;
import java.io.Closeable;
+import java.util.Collection;
/** An abstract {@link RaftClientRpc} implementation using {@link
PeerProxyMap}. */
public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
@@ -38,8 +39,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends
Closeable>
}
@Override
- public void addServers(Iterable<RaftPeer> servers) {
- proxies.addPeers(servers);
+ public void addRaftPeers(Collection<RaftPeer> servers) {
+ proxies.addRaftPeers(servers);
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index d46ef4a..58e5a65 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,6 +22,8 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
@@ -38,6 +40,16 @@ public class RaftPeer {
return EMPTY_ARRAY;
}
+ public interface Add {
+ /** Add the given peers. */
+ void addRaftPeers(Collection<RaftPeer> peers);
+
+ /** Add the given peers. */
+ default void addRaftPeers(RaftPeer... peers) {
+ addRaftPeers(Arrays.asList(peers));
+ }
+ }
+
/** The id of the peer. */
private final RaftPeerId id;
/** The address of the peer. */
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 80f026e..4e15a40 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -26,13 +26,14 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/** A map from peer id to peer and its proxy. */
-public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
+public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add,
Closeable {
public static final Logger LOG = LoggerFactory.getLogger(PeerProxyMap.class);
/** Peer and its proxy. */
@@ -113,7 +114,8 @@ public class PeerProxyMap<PROXY extends Closeable>
implements Closeable {
return p.getProxy();
}
- public void addPeers(Iterable<RaftPeer> newPeers) {
+ @Override
+ public void addRaftPeers(Collection<RaftPeer> newPeers) {
for(RaftPeer p : newPeers) {
computeIfAbsent(p);
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
index 7f53453..ccb182f 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
@@ -101,7 +101,7 @@ public class GrpcClientStreamer implements Closeable {
proxyMap = new PeerProxyMap<>(clientId.toString(),
raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer,
ResponseHandler::new, prop, tlsConfig));
- proxyMap.addPeers(group.getPeers());
+ proxyMap.addRaftPeers(group.getPeers());
refreshLeaderProxy(leaderId, null);
senderThread = new Sender();
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 385a9f7..5f41c0f 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -78,7 +78,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
void addPeers(Collection<RaftPeer> newPeers) {
// add to the map first in order to preserve the invariant.
- map.addPeers(newPeers);
+ map.addRaftPeers(newPeers);
// must use atomic addAll
peers.addAll(newPeers);
}
@@ -145,7 +145,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
@Override
- public void addPeers(Collection<RaftPeer> newPeers) {
+ public void addRaftPeers(Collection<RaftPeer> newPeers) {
proxies.addPeers(newPeers);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index ff12b57..2653000 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -20,18 +20,14 @@ package org.apache.ratis.server;
import org.apache.ratis.protocol.RaftPeer;
import java.io.Closeable;
-import java.util.Collection;
/**
* A server interface handling incoming streams
* Relays those streams to other servers after persisting
*/
-public interface DataStreamServerRpc extends Closeable {
+public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
/**
* start server
*/
void start();
-
- /** Add the given peers */
- void addPeers(Collection<RaftPeer> peers);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index ee34d88..6172f1e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -32,7 +32,7 @@ import java.util.Objects;
* An server-side interface for supporting different RPC implementations
* such as Netty, gRPC and Hadoop.
*/
-public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get,
Closeable {
+public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get,
RaftPeer.Add, Closeable {
/** To build {@link RaftServerRpc} objects. */
abstract class Builder<B extends Builder, RPC extends RaftServerRpc> {
private RaftServer server;
@@ -58,9 +58,6 @@ public interface RaftServerRpc extends RaftServerProtocol,
RpcType.Get, Closeabl
/** @return the address where this RPC server is listening to. */
InetSocketAddress getInetSocketAddress();
- /** add information of the given peers */
- void addPeers(Iterable<RaftPeer> peers);
-
/** Handle the given exception. For example, try reconnecting. */
void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e99b847..3c6cf57 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -845,7 +845,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
// add new peers into the rpc service
- getServerRpc().addPeers(peersInNewConf);
+ getServerRpc().addRaftPeers(peersInNewConf);
// add staging state into the leaderState
pending = leaderState.startSetConfiguration(request);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f52bc90..8d5e692 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -212,7 +212,7 @@ public class RaftServerProxy implements RaftServer {
private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group)
{
return CompletableFuture.supplyAsync(() -> {
try {
- serverRpc.addPeers(group.getPeers());
+ serverRpc.addRaftPeers(group.getPeers());
return new RaftServerImpl(group,
stateMachineRegistry.apply(group.getGroupId()), this);
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server
for " + group, e);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
index e3bcdf6..056bbea 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -26,6 +26,7 @@ import org.apache.ratis.util.PeerProxyMap;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collection;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -55,8 +56,8 @@ public abstract class RaftServerRpcWithProxy<PROXY extends
Closeable, PROXIES ex
}
@Override
- public void addPeers(Iterable<RaftPeer> peers) {
- getProxies().addPeers(peers);
+ public void addRaftPeers(Collection<RaftPeer> peers) {
+ getProxies().addRaftPeers(peers);
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 4ce061d..1905cef 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -370,7 +370,7 @@ public class ServerState implements Closeable {
void setRaftConf(long logIndex, RaftConfiguration conf) {
configurationManager.addConfiguration(logIndex, conf);
- server.getServerRpc().addPeers(conf.getPeers());
+ server.getServerRpc().addRaftPeers(conf.getPeers());
LOG.info("{}: set configuration {} at {}", getMemberId(), conf, logIndex);
LOG.trace("{}: {}", getMemberId(), configurationManager);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index e3eec8c..44046fc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -150,7 +150,7 @@ public abstract class RetryCacheTests<CLUSTER extends
MiniRaftCluster>
// same clientId and callId in the request
r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
callId, new SimpleMessage("message"));
- rpc.addServers(Arrays.asList(change.newPeers));
+ rpc.addRaftPeers(Arrays.asList(change.newPeers));
for (int i = 0; i < 10; i++) {
try {
assertReply(rpc.sendRequest(r), client, callId);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index ea47e03..b71b746 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -22,6 +22,8 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
+import java.util.Collection;
+
class SimulatedClientRpc
extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
implements RaftClientRpc {
@@ -30,7 +32,7 @@ class SimulatedClientRpc
}
@Override
- public void addServers(Iterable<RaftPeer> servers) {
+ public void addRaftPeers(Collection<RaftPeer> servers) {
// do nothing
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 0a4243e..05f47f5 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -124,7 +125,7 @@ class SimulatedServerRpc implements RaftServerRpc {
}
@Override
- public void addPeers(Iterable<RaftPeer> peers) {
+ public void addRaftPeers(Collection<RaftPeer> peers) {
// do nothing
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 9af676d..1077697 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -135,7 +135,7 @@ public class TestDataStream extends BaseTest {
// only the first server routes requests to peers.
List<RaftPeer> otherPeers = new ArrayList<>(peers);
otherPeers.remove(peers.get(i));
- rpc.addPeers(otherPeers);
+ rpc.addRaftPeers(otherPeers);
}
rpc.start();
servers.add(streamServer);