This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6ed64ee RATIS-1110. Fix DataStreamReply out of order. (#235)
6ed64ee is described below
commit 6ed64ee285a9469064a7cd2f368e8147a6bc6cbd
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 28 21:00:20 2020 +0800
RATIS-1110. Fix DataStreamReply out of order. (#235)
---
.../apache/ratis/netty/server/NettyServerStreamRpc.java | 4 ++--
.../java/org/apache/ratis/datastream/TestDataStream.java | 14 +++++++-------
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 4d2a43e..3cf52e2 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -253,8 +253,8 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
final CompletableFuture<?> current = previous.get()
- .thenCombine(JavaUtils.allOf(remoteWrites), (u, v) -> null)
- .thenCombine(localWrite, (v, bytesWritten) -> {
+ .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
+ .thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
sendReply(remoteWrites, request, bytesWritten, ctx);
return null;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index f398174..af930f7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -157,15 +157,17 @@ public class TestDataStream extends BaseTest {
@Test
public void testDataStreamSingleServer() throws Exception {
- runTestDataStream(1);
+ runTestDataStream(1, 1_000_000, 100);
+ runTestDataStream(1,1_000, 10_000);
}
@Test
public void testDataStreamMultipleServer() throws Exception {
- runTestDataStream(3);
+ runTestDataStream(3, 1_000_000, 100);
+ runTestDataStream(3, 1_000, 10_000);
}
- void runTestDataStream(int numServers) throws Exception {
+ void runTestDataStream(int numServers, int bufferSize, int bufferNum) throws
Exception {
properties = new RaftProperties();
peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
.map(RaftPeerId::valueOf)
@@ -175,15 +177,13 @@ public class TestDataStream extends BaseTest {
setupServer();
setupClient();
try {
- runTestDataStream();
+ runTestDataStream(bufferSize, bufferNum);
} finally {
shutdown();
}
}
- public void runTestDataStream(){
- final int bufferSize = 1024*1024;
- final int bufferNum = 10;
+ void runTestDataStream(int bufferSize, int bufferNum) {
final DataStreamOutput out = client.stream();
DataStreamClientImpl.DataStreamOutputImpl impl =
(DataStreamClientImpl.DataStreamOutputImpl) out;