amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502204193
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
}
}
- private RaftPeer[] peers;
+ private List<RaftPeer> peers;
private RaftProperties properties;
- private DataStreamServerImpl server;
+ private List<DataStreamServerImpl> servers;
private DataStreamClientImpl client;
private int byteWritten = 0;
- public void setupServer(){
- server = new DataStreamServerImpl(peers[0], new
SingleDataStreamStateMachine(), properties, null);
- server.getServerRpc().startServer();
+ private void setupServer(){
+ servers = new ArrayList<>(peers.size());
+ // start stream servers on raft peers.
+ for (int i = 0; i < peers.size(); i++) {
+ if (i == 0) {
+ // only the first server routes requests to peers.
+ List<RaftPeer> otherPeers = new ArrayList<>(peers);
+ otherPeers.remove(peers.get(i));
+ DataStreamServerImpl streamServer = new DataStreamServerImpl(
+ peers.get(i), properties, null, new
SingleDataStreamStateMachine(), otherPeers);
+ servers.add(streamServer);
+ streamServer.getServerRpc().startServer();
+ } else {
+ DataStreamServerImpl streamServer = new DataStreamServerImpl(
+ peers.get(i), new SingleDataStreamStateMachine(), properties,
null);
+ servers.add(streamServer);
+ streamServer.getServerRpc().startServer();
+ }
+ }
+
+ // start peer clients on stream servers
+ for (DataStreamServerImpl streamServer : servers) {
+ streamServer.getServerRpc().startClientToPeers();
+ }
}
- public void setupClient(){
- client = new DataStreamClientImpl(peers[0], properties, null);
+ private void setupClient(){
+ client = new DataStreamClientImpl(peers.get(0), properties, null);
client.start();
}
public void shutDownSetup(){
client.close();
- server.close();
+ servers.stream().forEach(s -> s.close());
}
@Test
public void testDataStream(){
properties = new RaftProperties();
peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
.map(RaftPeerId::valueOf)
- .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress()))
- .toArray(RaftPeer[]::new);
+ .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress())).collect(
+ Collectors.toList());
+
+ setupServer();
+ setupClient();
+ runTestDataStream();
+ }
+
+ @Test
+ public void testDataStreamMultipleServer(){
Review comment:
yes. I was thinking whether we need a read API to verify that write is
complete. I do need suggestion here: do we need a read API to have a better
test?
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
}
}
- private RaftPeer[] peers;
+ private List<RaftPeer> peers;
private RaftProperties properties;
- private DataStreamServerImpl server;
+ private List<DataStreamServerImpl> servers;
private DataStreamClientImpl client;
private int byteWritten = 0;
- public void setupServer(){
- server = new DataStreamServerImpl(peers[0], new
SingleDataStreamStateMachine(), properties, null);
- server.getServerRpc().startServer();
+ private void setupServer(){
+ servers = new ArrayList<>(peers.size());
+ // start stream servers on raft peers.
+ for (int i = 0; i < peers.size(); i++) {
+ if (i == 0) {
+ // only the first server routes requests to peers.
+ List<RaftPeer> otherPeers = new ArrayList<>(peers);
+ otherPeers.remove(peers.get(i));
+ DataStreamServerImpl streamServer = new DataStreamServerImpl(
+ peers.get(i), properties, null, new
SingleDataStreamStateMachine(), otherPeers);
+ servers.add(streamServer);
+ streamServer.getServerRpc().startServer();
+ } else {
+ DataStreamServerImpl streamServer = new DataStreamServerImpl(
+ peers.get(i), new SingleDataStreamStateMachine(), properties,
null);
+ servers.add(streamServer);
+ streamServer.getServerRpc().startServer();
+ }
+ }
+
+ // start peer clients on stream servers
+ for (DataStreamServerImpl streamServer : servers) {
+ streamServer.getServerRpc().startClientToPeers();
+ }
}
- public void setupClient(){
- client = new DataStreamClientImpl(peers[0], properties, null);
+ private void setupClient(){
+ client = new DataStreamClientImpl(peers.get(0), properties, null);
client.start();
}
public void shutDownSetup(){
client.close();
- server.close();
+ servers.stream().forEach(s -> s.close());
}
@Test
public void testDataStream(){
properties = new RaftProperties();
peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
.map(RaftPeerId::valueOf)
- .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress()))
- .toArray(RaftPeer[]::new);
+ .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress())).collect(
+ Collectors.toList());
+
+ setupServer();
+ setupClient();
+ runTestDataStream();
+ }
+
+ @Test
+ public void testDataStreamMultipleServer(){
Review comment:
yes. I was thinking whether we need a read API to verify that write is
complete. I do need suggestion here: do we need a read API to have a better
test? E.g. use read API to verify that all writes has succeed?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]