This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ed64ee  RATIS-1110. Fix DataStreamReply out of order. (#235)
6ed64ee is described below

commit 6ed64ee285a9469064a7cd2f368e8147a6bc6cbd
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 28 21:00:20 2020 +0800

    RATIS-1110. Fix DataStreamReply out of order. (#235)
---
 .../apache/ratis/netty/server/NettyServerStreamRpc.java    |  4 ++--
 .../java/org/apache/ratis/datastream/TestDataStream.java   | 14 +++++++-------
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 4d2a43e..3cf52e2 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -253,8 +253,8 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         }
 
         final CompletableFuture<?> current = previous.get()
-            .thenCombine(JavaUtils.allOf(remoteWrites), (u, v) -> null)
-            .thenCombine(localWrite, (v, bytesWritten) -> {
+            .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
+            .thenCombineAsync(localWrite, (v, bytesWritten) -> {
               buf.release();
               sendReply(remoteWrites, request, bytesWritten, ctx);
               return null;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index f398174..af930f7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -157,15 +157,17 @@ public class TestDataStream extends BaseTest {
 
   @Test
   public void testDataStreamSingleServer() throws Exception {
-    runTestDataStream(1);
+    runTestDataStream(1, 1_000_000, 100);
+    runTestDataStream(1,1_000, 10_000);
   }
 
   @Test
   public void testDataStreamMultipleServer() throws Exception {
-    runTestDataStream(3);
+    runTestDataStream(3, 1_000_000, 100);
+    runTestDataStream(3, 1_000, 10_000);
   }
 
-  void runTestDataStream(int numServers) throws Exception {
+  void runTestDataStream(int numServers, int bufferSize, int bufferNum) throws 
Exception {
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
         .map(RaftPeerId::valueOf)
@@ -175,15 +177,13 @@ public class TestDataStream extends BaseTest {
     setupServer();
     setupClient();
     try {
-      runTestDataStream();
+      runTestDataStream(bufferSize, bufferNum);
     } finally {
       shutdown();
     }
   }
 
-  public void runTestDataStream(){
-    final int bufferSize = 1024*1024;
-    final int bufferNum = 10;
+  void runTestDataStream(int bufferSize, int bufferNum) {
     final DataStreamOutput out = client.stream();
     DataStreamClientImpl.DataStreamOutputImpl impl = 
(DataStreamClientImpl.DataStreamOutputImpl) out;
 

Reply via email to