This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new beb3e3d  RATIS-1475. Use the stream level SlidingWindow.client in 
OrderedStreamAsync (#568)
beb3e3d is described below

commit beb3e3ded54432ae23a80e35b5abb1e5d466b08e
Author: hao guo <[email protected]>
AuthorDate: Wed Dec 22 23:25:08 2021 +0800

    RATIS-1475. Use the stream level SlidingWindow.client in OrderedStreamAsync 
(#568)
---
 .../apache/ratis/client/impl/DataStreamClientImpl.java   |  7 +++++--
 .../org/apache/ratis/client/impl/OrderedStreamAsync.java | 16 ++++++++--------
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 29356f6..7e04d2f 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -41,6 +41,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.SlidingWindow;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -66,12 +67,13 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     this.groupId = groupId;
     this.dataStreamServer = dataStreamServer;
     this.dataStreamClientRpc = dataStreamClientRpc;
-    this.orderedStreamAsync = new OrderedStreamAsync(clientId, 
dataStreamClientRpc, properties);
+    this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, 
properties);
   }
 
   public final class DataStreamOutputImpl implements DataStreamOutputRpc {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
+    private final 
SlidingWindow.Client<OrderedStreamAsync.DataStreamWindowRequest, 
DataStreamReply> slidingWindow;
     private final CompletableFuture<RaftClientReply> raftClientReplyFuture = 
new CompletableFuture<>();
     private CompletableFuture<DataStreamReply> closeFuture;
     private final MemoizedSupplier<WritableByteChannel> 
writableByteChannelSupplier
@@ -103,6 +105,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
 
     private DataStreamOutputImpl(RaftClientRequest request) {
       this.header = request;
+      this.slidingWindow = new 
SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, 
header.getCallId()));
       final ByteBuffer buffer = 
ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
       this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
     }
@@ -110,7 +113,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     private CompletableFuture<DataStreamReply> send(Type type, Object data, 
long length, WriteOption... options) {
       final DataStreamRequestHeader h =
           new DataStreamRequestHeader(header.getClientId(), type, 
header.getCallId(), streamOffset, length, options);
-      return orderedStreamAsync.sendRequest(h, data);
+      return orderedStreamAsync.sendRequest(h, data, slidingWindow);
     }
 
     private CompletableFuture<DataStreamReply> 
combineHeader(CompletableFuture<DataStreamReply> future) {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index ffe1b1e..d9f5862 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -24,7 +24,6 @@ import 
org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.FilePositionCount;
-import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
@@ -103,19 +102,19 @@ public class OrderedStreamAsync {
   }
 
   private final DataStreamClientRpc dataStreamClientRpc;
-  private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> 
slidingWindow;
+
   private final Semaphore requestSemaphore;
   private final TimeDuration requestTimeout;
   private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
-  OrderedStreamAsync(ClientId clientId, DataStreamClientRpc 
dataStreamClientRpc, RaftProperties properties){
+  OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties 
properties){
     this.dataStreamClientRpc = dataStreamClientRpc;
-    this.slidingWindow = new SlidingWindow.Client<>(clientId);
     this.requestSemaphore = new 
Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
     this.requestTimeout = 
RaftClientConfigKeys.DataStream.requestTimeout(properties);
   }
 
-  CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader 
header, Object data) {
+  CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader 
header, Object data,
+      SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> 
slidingWindow) {
     try {
       requestSemaphore.acquire();
     } catch (InterruptedException e){
@@ -124,7 +123,7 @@ public class OrderedStreamAsync {
     }
     final LongFunction<DataStreamWindowRequest> constructor
         = seqNum -> new DataStreamWindowRequest(header, data, seqNum);
-    return slidingWindow.submitNewRequest(constructor, 
this::sendRequestToNetwork).
+    return slidingWindow.submitNewRequest(constructor, r -> 
sendRequestToNetwork(r, slidingWindow)).
            getReplyFuture().whenComplete((r, e) -> {
              if (e != null) {
                LOG.error("Failed to send request, header=" + header, e);
@@ -133,7 +132,8 @@ public class OrderedStreamAsync {
            });
   }
 
-  private void sendRequestToNetwork(DataStreamWindowRequest request){
+  private void sendRequestToNetwork(DataStreamWindowRequest request,
+      SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> 
slidingWindow) {
     CompletableFuture<DataStreamReply> f = request.getReplyFuture();
     if(f.isDone()) {
       return;
@@ -149,7 +149,7 @@ public class OrderedStreamAsync {
 
     requestFuture.thenApply(reply -> {
       slidingWindow.receiveReply(
-          seqNum, reply, this::sendRequestToNetwork);
+          seqNum, reply, r -> sendRequestToNetwork(r, slidingWindow));
       return reply;
     }).thenAccept(reply -> {
       if (f.isDone()) {

Reply via email to