This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 75af736d RATIS-1569. Move the asyncRpcApi.sendForward(..) call to the 
client side. (#635)
75af736d is described below

commit 75af736d0d20c44b5d47a458b0c7b33387a75a6d
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 20 07:39:44 2022 -0700

    RATIS-1569. Move the asyncRpcApi.sendForward(..) call to the client side. 
(#635)
---
 .../java/org/apache/ratis/client/AsyncRpcApi.java  |  7 ++--
 .../org/apache/ratis/client/DataStreamClient.java  | 24 +++++++++----
 .../java/org/apache/ratis/client/RaftClient.java   |  3 ++
 .../apache/ratis/client/impl/ClientImplUtils.java  |  5 +++
 .../ratis/client/impl/DataStreamClientImpl.java    | 34 ++++++++++++++++++-
 .../apache/ratis/client/impl/RaftClientImpl.java   |  7 ++--
 .../ratis/netty/server/DataStreamManagement.java   | 39 +++-------------------
 7 files changed, 69 insertions(+), 50 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java
index 68d536f8..29370755 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java
@@ -26,10 +26,9 @@ import java.util.concurrent.CompletableFuture;
 /** An RPC interface which extends the user interface {@link AsyncApi}. */
 public interface AsyncRpcApi extends AsyncApi {
   /**
-   * Send the given RaftClientRequest asynchronously to the raft service.
-   * The RaftClientRequest will wrapped as Message in a new RaftClientRequest
-   * and leader will be decode it from the Message
-   * @param request The RaftClientRequest.
+   * Send the given forward-request asynchronously to the raft service.
+   *
+   * @param request The request to be forwarded.
    * @return a future of the reply.
    */
   CompletableFuture<RaftClientReply> sendForward(RaftClientRequest request);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index b00f1821..94b9252c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -42,7 +42,11 @@ public interface DataStreamClient extends DataStreamRpcApi, 
Closeable {
   DataStreamClientRpc getClientRpc();
 
   static Builder newBuilder() {
-    return new Builder();
+    return newBuilder(null);
+  }
+
+  static Builder newBuilder(RaftClient client) {
+    return new Builder(client);
   }
 
   /** To build {@link DataStreamClient} objects */
@@ -51,10 +55,14 @@ public interface DataStreamClient extends DataStreamRpcApi, 
Closeable {
     private DataStreamClientRpc dataStreamClientRpc;
     private RaftProperties properties;
     private Parameters parameters;
-    private RaftGroupId raftGroupId;
+    private RaftGroupId groupId;
     private ClientId clientId;
 
-    private Builder() {}
+    private final RaftClient client;
+
+    private Builder(RaftClient client) {
+      this.client = client;
+    }
 
     public DataStreamClient build() {
       Objects.requireNonNull(dataStreamServer, "The 'dataStreamServer' field 
is not initialized.");
@@ -65,9 +73,13 @@ public interface DataStreamClient extends DataStreamRpcApi, 
Closeable {
               .newDataStreamClientRpc(dataStreamServer, properties);
         }
       }
+      if (client != null) {
+        return ClientImplUtils.newDataStreamClient(
+            client, dataStreamServer, dataStreamClientRpc, properties);
+      }
       return ClientImplUtils.newDataStreamClient(
           Optional.ofNullable(clientId).orElseGet(ClientId::randomId),
-          raftGroupId, dataStreamServer, dataStreamClientRpc, properties);
+          groupId, dataStreamServer, dataStreamClientRpc, properties);
     }
 
     public Builder setClientId(ClientId clientId) {
@@ -75,8 +87,8 @@ public interface DataStreamClient extends DataStreamRpcApi, 
Closeable {
       return this;
     }
 
-    public Builder setRaftGroupId(RaftGroupId raftGroupId) {
-      this.raftGroupId = raftGroupId;
+    public Builder setGroupId(RaftGroupId groupId) {
+      this.groupId = groupId;
       return this;
     }
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index d4c2f16e..60877b33 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -47,6 +47,9 @@ public interface RaftClient extends Closeable {
   /** @return the id of this client. */
   ClientId getId();
 
+  /** @return the group id of this client. */
+  RaftGroupId getGroupId();
+
   /** @return the cluster leaderId recorded by this client. */
   RaftPeerId getLeaderId();
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 392dcd7f..a69e9ffd 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -44,4 +44,9 @@ public interface ClientImplUtils {
       DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
     return new DataStreamClientImpl(clientId, groupId, 
primaryDataStreamServer, dataStreamClientRpc, properties);
   }
+
+  static DataStreamClient newDataStreamClient(RaftClient client, RaftPeer 
primaryDataStreamServer,
+      DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+    return new DataStreamClientImpl(client, primaryDataStreamServer, 
dataStreamClientRpc, properties);
+  }
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 7e04d2f2..8a2692a5 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -17,11 +17,14 @@
 */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.client.AsyncRpcApi;
 import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
@@ -54,6 +57,7 @@ import java.util.concurrent.CompletableFuture;
  * allows client to create streams and send asynchronously.
  */
 public class DataStreamClientImpl implements DataStreamClient {
+  private final RaftClient client;
   private final ClientId clientId;
   private final RaftGroupId groupId;
 
@@ -63,6 +67,7 @@ public class DataStreamClientImpl implements DataStreamClient 
{
 
   DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer 
dataStreamServer,
       DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+    this.client = null;
     this.clientId = clientId;
     this.groupId = groupId;
     this.dataStreamServer = dataStreamServer;
@@ -70,6 +75,16 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, 
properties);
   }
 
+  DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer,
+      DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+    this.client = client;
+    this.clientId = client.getId();
+    this.groupId = client.getGroupId();
+    this.dataStreamServer = dataStreamServer;
+    this.dataStreamClientRpc = dataStreamClientRpc;
+    this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, 
properties);
+  }
+
   public final class DataStreamOutputImpl implements DataStreamOutputRpc {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
@@ -127,7 +142,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       }
       final CompletableFuture<DataStreamReply> f = 
combineHeader(send(Type.STREAM_DATA, data, length, options));
       if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
-        closeFuture = f;
+        closeFuture = client != null? f.thenCompose(this::sendForward): f;
         
f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
       }
       streamOffset += length;
@@ -172,6 +187,23 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     public WritableByteChannel getWritableByteChannel() {
       return writableByteChannelSupplier.get();
     }
+
+    private CompletableFuture<DataStreamReply> sendForward(DataStreamReply 
writeReply) {
+      if (!writeReply.isSuccess()) {
+        return CompletableFuture.completedFuture(writeReply);
+      }
+      final AsyncRpcApi asyncRpc = (AsyncRpcApi) client.async();
+      return asyncRpc.sendForward(header).thenApply(clientReply -> 
DataStreamReplyByteBuffer.newBuilder()
+          .setClientId(clientId)
+          .setType(writeReply.getType())
+          .setStreamId(writeReply.getStreamId())
+          .setStreamOffset(writeReply.getStreamOffset())
+          
.setBuffer(ClientProtoUtils.toRaftClientReplyProto(clientReply).toByteString().asReadOnlyByteBuffer())
+          .setSuccess(clientReply.isSuccess())
+          .setBytesWritten(writeReply.getBytesWritten())
+          .setCommitInfos(clientReply.getCommitInfos())
+          .build());
+    }
   }
 
   @Override
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7313c914..b4c59f40 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -167,9 +167,7 @@ public final class RaftClientImpl implements RaftClient {
     this.messageStreamApi = JavaUtils.memoize(() -> 
MessageStreamImpl.newInstance(this, properties));
     this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
     this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
-    this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder()
-        .setClientId(clientId)
-        .setRaftGroupId(groupId)
+    this.dataStreamApi = JavaUtils.memoize(() -> 
DataStreamClient.newBuilder(this)
         .setDataStreamServer(primaryDataStreamServer)
         .setProperties(properties)
         .setParameters(parameters)
@@ -182,7 +180,8 @@ public final class RaftClientImpl implements RaftClient {
     return leaderId;
   }
 
-  RaftGroupId getGroupId() {
+  @Override
+  public RaftGroupId getGroupId() {
     return groupId;
   }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 81c3aa1b..ddef3ac9 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -19,7 +19,6 @@
 package org.apache.ratis.netty.server;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.apache.ratis.client.AsyncRpcApi;
 import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
@@ -324,14 +323,12 @@ public class DataStreamManagement {
   }
 
   static DataStreamReplyByteBuffer 
newDataStreamReplyByteBuffer(DataStreamRequestByteBuf request,
-      RaftClientReply reply, long bytesWritten, Collection<CommitInfoProto> 
commitInfos) {
+      RaftClientReply reply) {
     final ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
     return DataStreamReplyByteBuffer.newBuilder()
         .setDataStreamPacket(request)
         .setBuffer(buffer)
         .setSuccess(reply.isSuccess())
-        .setBytesWritten(bytesWritten)
-        .setCommitInfos(commitInfos)
         .build();
   }
 
@@ -349,28 +346,6 @@ public class DataStreamManagement {
     ctx.writeAndFlush(builder.build());
   }
 
-  private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info, 
DataStreamRequestByteBuf request,
-      long bytesWritten, ChannelHandlerContext ctx) {
-    final RequestMetrics metrics = 
getMetrics().newRequestMetrics(RequestType.START_TRANSACTION);
-    final RequestContext context = metrics.start();
-    try {
-      AsyncRpcApi asyncRpcApi = (AsyncRpcApi) 
(server.getDivision(info.getRequest()
-          .getRaftGroupId())
-          .getRaftClient()
-          .async());
-      return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply, 
e) -> {
-        metrics.stop(context, e == null);
-        if (e != null) {
-          replyDataStreamException(server, e, info.getRequest(), request, ctx);
-        } else {
-          ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 
bytesWritten, info.getCommitInfos()));
-        }
-      }, requestExecutor);
-    } catch (IOException e) {
-      throw new CompletionException(e);
-    }
-  }
-
   static void replyDataStreamException(RaftServer server, Throwable cause, 
RaftClientRequest raftClientRequest,
       DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
     final RaftClientReply reply = RaftClientReply.newBuilder()
@@ -394,7 +369,7 @@ public class DataStreamManagement {
       ChannelHandlerContext ctx) {
     LOG.warn("Failed to process {}",  request, throwable);
     try {
-      ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0, null));
+      ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
     } catch (Throwable t) {
       LOG.warn("Failed to sendDataStreamException {} for {}", throwable, 
request, t);
     }
@@ -452,15 +427,9 @@ public class DataStreamManagement {
     composeAsync(info.getPrevious(), requestExecutor, n -> 
JavaUtils.allOf(remoteWrites)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           if (request.getType() == Type.STREAM_HEADER
-              || (request.getType() == Type.STREAM_DATA && !close)) {
+              || request.getType() == Type.STREAM_DATA
+              || close) {
             sendReply(remoteWrites, request, bytesWritten, 
info.getCommitInfos(), ctx);
-          } else if (close) {
-            if (info.isPrimary()) {
-              // after all server close stream, primary server start 
transaction
-              startTransaction(info, request, bytesWritten, ctx);
-            } else {
-              sendReply(remoteWrites, request, bytesWritten, 
info.getCommitInfos(), ctx);
-            }
           } else {
             throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
           }

Reply via email to