This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cd9f97  RATIS-1103. Add an interface for adding RaftPeers. (#227)
6cd9f97 is described below

commit 6cd9f979164f24d9bb73c9e8a71239df71686c92
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 21 09:57:42 2020 +0800

    RATIS-1103. Add an interface for adding RaftPeers. (#227)
---
 .../java/org/apache/ratis/client/DataStreamClient.java     |  3 ---
 .../main/java/org/apache/ratis/client/RaftClientRpc.java   |  5 +----
 .../org/apache/ratis/client/impl/DataStreamClientImpl.java |  5 -----
 .../org/apache/ratis/client/impl/GroupManagementImpl.java  |  2 +-
 .../java/org/apache/ratis/client/impl/RaftClientImpl.java  | 12 +++---------
 .../apache/ratis/client/impl/RaftClientRpcWithProxy.java   |  5 +++--
 .../src/main/java/org/apache/ratis/protocol/RaftPeer.java  | 14 +++++++++++++-
 .../src/main/java/org/apache/ratis/util/PeerProxyMap.java  |  6 ++++--
 .../org/apache/ratis/grpc/client/GrpcClientStreamer.java   |  2 +-
 .../apache/ratis/netty/server/NettyServerStreamRpc.java    |  4 ++--
 .../java/org/apache/ratis/server/DataStreamServerRpc.java  |  6 +-----
 .../main/java/org/apache/ratis/server/RaftServerRpc.java   |  5 +----
 .../java/org/apache/ratis/server/impl/RaftServerImpl.java  |  2 +-
 .../java/org/apache/ratis/server/impl/RaftServerProxy.java |  2 +-
 .../apache/ratis/server/impl/RaftServerRpcWithProxy.java   |  7 ++++---
 .../java/org/apache/ratis/server/impl/ServerState.java     |  2 +-
 .../src/test/java/org/apache/ratis/RetryCacheTests.java    |  2 +-
 .../apache/ratis/server/simulation/SimulatedClientRpc.java |  4 +++-
 .../apache/ratis/server/simulation/SimulatedServerRpc.java |  3 ++-
 .../java/org/apache/ratis/datastream/TestDataStream.java   |  2 +-
 20 files changed, 44 insertions(+), 49 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index b72e1fd..9122763 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -38,9 +38,6 @@ public interface DataStreamClient extends DataStreamApi, 
Closeable {
   /** Return the rpc client instance **/
   DataStreamClientRpc getClientRpc();
 
-  /** add information of the raft peers to communicate with */
-  void addPeers(Iterable<RaftPeer> peers);
-
   /** start the client */
   void start();
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index cf8300e..9034829 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 /** The client side rpc of a raft service. */
-public interface RaftClientRpc extends Closeable {
+public interface RaftClientRpc extends RaftPeer.Add, Closeable {
   /** Async call to send a request. */
   default CompletableFuture<RaftClientReply> 
sendRequestAsync(RaftClientRequest request) {
     throw new UnsupportedOperationException(getClass() + " does not support 
this method.");
@@ -44,9 +44,6 @@ public interface RaftClientRpc extends Closeable {
   /** Send a request. */
   RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
 
-  /** Add the information of the given raft servers */
-  void addServers(Iterable<RaftPeer> servers);
-
   /**
    * Handle the given throwable. For example, try reconnecting.
    *
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 9149b74..0bfd662 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -115,11 +115,6 @@ public class DataStreamClientImpl implements 
DataStreamClient {
   }
 
   @Override
-  public void addPeers(Iterable<RaftPeer> peers) {
-    return;
-  }
-
-  @Override
   public void close(){
     dataStreamClientRpc.closeClient();
   }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index fab45b3..e8566df 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -46,7 +46,7 @@ class GroupManagementImpl implements GroupManagementApi {
     Objects.requireNonNull(newGroup, "newGroup == null");
 
     final long callId = RaftClientImpl.nextCallId();
-    client.addServers(newGroup.getPeers().stream());
+    client.getClientRpc().addRaftPeers(newGroup.getPeers());
     return 
client.io().sendRequest(GroupManagementRequest.newAdd(client.getId(), server, 
callId, newGroup));
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7105657..4ff170a 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -60,7 +60,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Stream;
 
 /** A client who sends requests to a raft service. */
 public final class RaftClientImpl implements RaftClient {
@@ -133,7 +132,7 @@ public final class RaftClientImpl implements RaftClient {
     this.retryPolicy = retryPolicy;
 
     scheduler = TimeoutScheduler.getInstance();
-    clientRpc.addServers(peers);
+    clientRpc.addRaftPeers(peers);
 
     this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, 
properties));
     this.streamApi = JavaUtils.memoize(() -> 
MessageStreamImpl.newInstance(this, properties));
@@ -209,7 +208,7 @@ public final class RaftClientImpl implements RaftClient {
 
     final long callId = nextCallId();
     // also refresh the rpc proxies for these peers
-    addServers(Arrays.stream(peersInNewConf));
+    clientRpc.addRaftPeers(peersInNewConf);
     return io().sendRequestWithRetry(() -> new SetConfigurationRequest(
         clientId, leaderId, groupId, callId, Arrays.asList(peersInNewConf)));
   }
@@ -229,11 +228,6 @@ public final class RaftClientImpl implements RaftClient {
     return blockingApi.get();
   }
 
-  void addServers(Stream<RaftPeer> peersInNewConf) {
-    clientRpc.addServers(
-        peersInNewConf.filter(p -> !peers.contains(p))::iterator);
-  }
-
   Throwable noMoreRetries(ClientRetryEvent event) {
     final int attemptCount = event.getAttemptCount();
     final Throwable throwable = event.getCause();
@@ -284,7 +278,7 @@ public final class RaftClientImpl implements RaftClient {
       peers.clear();
       peers.addAll(newPeers);
       // also refresh the rpc proxies for these peers
-      clientRpc.addServers(newPeers);
+      clientRpc.addRaftPeers(newPeers);
     }
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
index b5fbf48..281ceb3 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.PeerProxyMap;
 
 import java.io.Closeable;
+import java.util.Collection;
 
 /** An abstract {@link RaftClientRpc} implementation using {@link 
PeerProxyMap}. */
 public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
@@ -38,8 +39,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends 
Closeable>
   }
 
   @Override
-  public void addServers(Iterable<RaftPeer> servers) {
-    proxies.addPeers(servers);
+  public void addRaftPeers(Collection<RaftPeer> servers) {
+    proxies.addRaftPeers(servers);
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index d46ef4a..58e5a65 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,6 +22,8 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Objects;
 import java.util.function.Supplier;
 
@@ -38,6 +40,16 @@ public class RaftPeer {
     return EMPTY_ARRAY;
   }
 
+  public interface Add {
+    /** Add the given peers. */
+    void addRaftPeers(Collection<RaftPeer> peers);
+
+    /** Add the given peers. */
+    default void addRaftPeers(RaftPeer... peers) {
+      addRaftPeers(Arrays.asList(peers));
+    }
+  }
+
   /** The id of the peer. */
   private final RaftPeerId id;
   /** The address of the peer. */
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 80f026e..4e15a40 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -26,13 +26,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** A map from peer id to peer and its proxy. */
-public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
+public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, 
Closeable {
   public static final Logger LOG = LoggerFactory.getLogger(PeerProxyMap.class);
 
   /** Peer and its proxy. */
@@ -113,7 +114,8 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
     return p.getProxy();
   }
 
-  public void addPeers(Iterable<RaftPeer> newPeers) {
+  @Override
+  public void addRaftPeers(Collection<RaftPeer> newPeers) {
     for(RaftPeer p : newPeers) {
       computeIfAbsent(p);
     }
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
index 7f53453..ccb182f 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
@@ -101,7 +101,7 @@ public class GrpcClientStreamer implements Closeable {
     proxyMap = new PeerProxyMap<>(clientId.toString(),
         raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer,
             ResponseHandler::new, prop, tlsConfig));
-    proxyMap.addPeers(group.getPeers());
+    proxyMap.addRaftPeers(group.getPeers());
     refreshLeaderProxy(leaderId, null);
 
     senderThread = new Sender();
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 385a9f7..5f41c0f 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -78,7 +78,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
     void addPeers(Collection<RaftPeer> newPeers) {
       // add to the map first in order to preserve the invariant.
-      map.addPeers(newPeers);
+      map.addRaftPeers(newPeers);
       // must use atomic addAll
       peers.addAll(newPeers);
     }
@@ -145,7 +145,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   }
 
   @Override
-  public void addPeers(Collection<RaftPeer> newPeers) {
+  public void addRaftPeers(Collection<RaftPeer> newPeers) {
     proxies.addPeers(newPeers);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index ff12b57..2653000 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -20,18 +20,14 @@ package org.apache.ratis.server;
 import org.apache.ratis.protocol.RaftPeer;
 
 import java.io.Closeable;
-import java.util.Collection;
 
 /**
  * A server interface handling incoming streams
  * Relays those streams to other servers after persisting
  */
-public interface DataStreamServerRpc extends Closeable {
+public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
   /**
    * start server
    */
   void start();
-
-  /** Add the given peers */
-  void addPeers(Collection<RaftPeer> peers);
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index ee34d88..6172f1e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -32,7 +32,7 @@ import java.util.Objects;
  * An server-side interface for supporting different RPC implementations
  * such as Netty, gRPC and Hadoop.
  */
-public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, 
Closeable {
+public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, 
RaftPeer.Add, Closeable {
   /** To build {@link RaftServerRpc} objects. */
   abstract class Builder<B extends Builder, RPC extends RaftServerRpc> {
     private RaftServer server;
@@ -58,9 +58,6 @@ public interface RaftServerRpc extends RaftServerProtocol, 
RpcType.Get, Closeabl
   /** @return the address where this RPC server is listening to. */
   InetSocketAddress getInetSocketAddress();
 
-  /** add information of the given peers */
-  void addPeers(Iterable<RaftPeer> peers);
-
   /** Handle the given exception.  For example, try reconnecting. */
   void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e99b847..3c6cf57 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -845,7 +845,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       }
 
       // add new peers into the rpc service
-      getServerRpc().addPeers(peersInNewConf);
+      getServerRpc().addRaftPeers(peersInNewConf);
       // add staging state into the leaderState
       pending = leaderState.startSetConfiguration(request);
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f52bc90..8d5e692 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -212,7 +212,7 @@ public class RaftServerProxy implements RaftServer {
   private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) 
{
     return CompletableFuture.supplyAsync(() -> {
       try {
-        serverRpc.addPeers(group.getPeers());
+        serverRpc.addRaftPeers(group.getPeers());
         return new RaftServerImpl(group, 
stateMachineRegistry.apply(group.getGroupId()), this);
       } catch(IOException e) {
         throw new CompletionException(getId() + ": Failed to initialize server 
for " + group, e);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
index e3bcdf6..056bbea 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,6 +26,7 @@ import org.apache.ratis.util.PeerProxyMap;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -55,8 +56,8 @@ public abstract class RaftServerRpcWithProxy<PROXY extends 
Closeable, PROXIES ex
   }
 
   @Override
-  public void addPeers(Iterable<RaftPeer> peers) {
-    getProxies().addPeers(peers);
+  public void addRaftPeers(Collection<RaftPeer> peers) {
+    getProxies().addRaftPeers(peers);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 4ce061d..1905cef 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -370,7 +370,7 @@ public class ServerState implements Closeable {
 
   void setRaftConf(long logIndex, RaftConfiguration conf) {
     configurationManager.addConfiguration(logIndex, conf);
-    server.getServerRpc().addPeers(conf.getPeers());
+    server.getServerRpc().addRaftPeers(conf.getPeers());
     LOG.info("{}: set configuration {} at {}", getMemberId(), conf, logIndex);
     LOG.trace("{}: {}", getMemberId(), configurationManager);
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index e3eec8c..44046fc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -150,7 +150,7 @@ public abstract class RetryCacheTests<CLUSTER extends 
MiniRaftCluster>
       // same clientId and callId in the request
       r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
               callId, new SimpleMessage("message"));
-      rpc.addServers(Arrays.asList(change.newPeers));
+      rpc.addRaftPeers(Arrays.asList(change.newPeers));
       for (int i = 0; i < 10; i++) {
         try {
           assertReply(rpc.sendRequest(r), client, callId);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index ea47e03..b71b746 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -22,6 +22,8 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 
+import java.util.Collection;
+
 class SimulatedClientRpc
     extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
     implements RaftClientRpc {
@@ -30,7 +32,7 @@ class SimulatedClientRpc
   }
 
   @Override
-  public void addServers(Iterable<RaftPeer> servers) {
+  public void addRaftPeers(Collection<RaftPeer> servers) {
     // do nothing
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 0a4243e..05f47f5 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -124,7 +125,7 @@ class SimulatedServerRpc implements RaftServerRpc {
   }
 
   @Override
-  public void addPeers(Iterable<RaftPeer> peers) {
+  public void addRaftPeers(Collection<RaftPeer> peers) {
     // do nothing
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 9af676d..1077697 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -135,7 +135,7 @@ public class TestDataStream extends BaseTest {
         // only the first server routes requests to peers.
         List<RaftPeer> otherPeers = new ArrayList<>(peers);
         otherPeers.remove(peers.get(i));
-        rpc.addPeers(otherPeers);
+        rpc.addRaftPeers(otherPeers);
       }
       rpc.start();
       servers.add(streamServer);

Reply via email to