This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 75af736d RATIS-1569. Move the asyncRpcApi.sendForward(..) call to the
client side. (#635)
75af736d is described below
commit 75af736d0d20c44b5d47a458b0c7b33387a75a6d
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 20 07:39:44 2022 -0700
RATIS-1569. Move the asyncRpcApi.sendForward(..) call to the client side.
(#635)
---
.../java/org/apache/ratis/client/AsyncRpcApi.java | 7 ++--
.../org/apache/ratis/client/DataStreamClient.java | 24 +++++++++----
.../java/org/apache/ratis/client/RaftClient.java | 3 ++
.../apache/ratis/client/impl/ClientImplUtils.java | 5 +++
.../ratis/client/impl/DataStreamClientImpl.java | 34 ++++++++++++++++++-
.../apache/ratis/client/impl/RaftClientImpl.java | 7 ++--
.../ratis/netty/server/DataStreamManagement.java | 39 +++-------------------
7 files changed, 69 insertions(+), 50 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java
index 68d536f8..29370755 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java
@@ -26,10 +26,9 @@ import java.util.concurrent.CompletableFuture;
/** An RPC interface which extends the user interface {@link AsyncApi}. */
public interface AsyncRpcApi extends AsyncApi {
/**
- * Send the given RaftClientRequest asynchronously to the raft service.
- * The RaftClientRequest will wrapped as Message in a new RaftClientRequest
- * and leader will be decode it from the Message
- * @param request The RaftClientRequest.
+ * Send the given forward-request asynchronously to the raft service.
+ *
+ * @param request The request to be forwarded.
* @return a future of the reply.
*/
CompletableFuture<RaftClientReply> sendForward(RaftClientRequest request);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index b00f1821..94b9252c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -42,7 +42,11 @@ public interface DataStreamClient extends DataStreamRpcApi,
Closeable {
DataStreamClientRpc getClientRpc();
static Builder newBuilder() {
- return new Builder();
+ return newBuilder(null);
+ }
+
+ static Builder newBuilder(RaftClient client) {
+ return new Builder(client);
}
/** To build {@link DataStreamClient} objects */
@@ -51,10 +55,14 @@ public interface DataStreamClient extends DataStreamRpcApi,
Closeable {
private DataStreamClientRpc dataStreamClientRpc;
private RaftProperties properties;
private Parameters parameters;
- private RaftGroupId raftGroupId;
+ private RaftGroupId groupId;
private ClientId clientId;
- private Builder() {}
+ private final RaftClient client;
+
+ private Builder(RaftClient client) {
+ this.client = client;
+ }
public DataStreamClient build() {
Objects.requireNonNull(dataStreamServer, "The 'dataStreamServer' field
is not initialized.");
@@ -65,9 +73,13 @@ public interface DataStreamClient extends DataStreamRpcApi,
Closeable {
.newDataStreamClientRpc(dataStreamServer, properties);
}
}
+ if (client != null) {
+ return ClientImplUtils.newDataStreamClient(
+ client, dataStreamServer, dataStreamClientRpc, properties);
+ }
return ClientImplUtils.newDataStreamClient(
Optional.ofNullable(clientId).orElseGet(ClientId::randomId),
- raftGroupId, dataStreamServer, dataStreamClientRpc, properties);
+ groupId, dataStreamServer, dataStreamClientRpc, properties);
}
public Builder setClientId(ClientId clientId) {
@@ -75,8 +87,8 @@ public interface DataStreamClient extends DataStreamRpcApi,
Closeable {
return this;
}
- public Builder setRaftGroupId(RaftGroupId raftGroupId) {
- this.raftGroupId = raftGroupId;
+ public Builder setGroupId(RaftGroupId groupId) {
+ this.groupId = groupId;
return this;
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index d4c2f16e..60877b33 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -47,6 +47,9 @@ public interface RaftClient extends Closeable {
/** @return the id of this client. */
ClientId getId();
+ /** @return the group id of this client. */
+ RaftGroupId getGroupId();
+
/** @return the cluster leaderId recorded by this client. */
RaftPeerId getLeaderId();
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 392dcd7f..a69e9ffd 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -44,4 +44,9 @@ public interface ClientImplUtils {
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
return new DataStreamClientImpl(clientId, groupId,
primaryDataStreamServer, dataStreamClientRpc, properties);
}
+
+ static DataStreamClient newDataStreamClient(RaftClient client, RaftPeer
primaryDataStreamServer,
+ DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+ return new DataStreamClientImpl(client, primaryDataStreamServer,
dataStreamClientRpc, properties);
+ }
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 7e04d2f2..8a2692a5 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -17,11 +17,14 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
@@ -54,6 +57,7 @@ import java.util.concurrent.CompletableFuture;
* allows client to create streams and send asynchronously.
*/
public class DataStreamClientImpl implements DataStreamClient {
+ private final RaftClient client;
private final ClientId clientId;
private final RaftGroupId groupId;
@@ -63,6 +67,7 @@ public class DataStreamClientImpl implements DataStreamClient
{
DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer
dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+ this.client = null;
this.clientId = clientId;
this.groupId = groupId;
this.dataStreamServer = dataStreamServer;
@@ -70,6 +75,16 @@ public class DataStreamClientImpl implements
DataStreamClient {
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
}
+ DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer,
+ DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
+ this.client = client;
+ this.clientId = client.getId();
+ this.groupId = client.getGroupId();
+ this.dataStreamServer = dataStreamServer;
+ this.dataStreamClientRpc = dataStreamClientRpc;
+ this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
+ }
+
public final class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
@@ -127,7 +142,7 @@ public class DataStreamClientImpl implements
DataStreamClient {
}
final CompletableFuture<DataStreamReply> f =
combineHeader(send(Type.STREAM_DATA, data, length, options));
if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
- closeFuture = f;
+ closeFuture = client != null? f.thenCompose(this::sendForward): f;
f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
}
streamOffset += length;
@@ -172,6 +187,23 @@ public class DataStreamClientImpl implements
DataStreamClient {
public WritableByteChannel getWritableByteChannel() {
return writableByteChannelSupplier.get();
}
+
+ private CompletableFuture<DataStreamReply> sendForward(DataStreamReply
writeReply) {
+ if (!writeReply.isSuccess()) {
+ return CompletableFuture.completedFuture(writeReply);
+ }
+ final AsyncRpcApi asyncRpc = (AsyncRpcApi) client.async();
+ return asyncRpc.sendForward(header).thenApply(clientReply ->
DataStreamReplyByteBuffer.newBuilder()
+ .setClientId(clientId)
+ .setType(writeReply.getType())
+ .setStreamId(writeReply.getStreamId())
+ .setStreamOffset(writeReply.getStreamOffset())
+
.setBuffer(ClientProtoUtils.toRaftClientReplyProto(clientReply).toByteString().asReadOnlyByteBuffer())
+ .setSuccess(clientReply.isSuccess())
+ .setBytesWritten(writeReply.getBytesWritten())
+ .setCommitInfos(clientReply.getCommitInfos())
+ .build());
+ }
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7313c914..b4c59f40 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -167,9 +167,7 @@ public final class RaftClientImpl implements RaftClient {
this.messageStreamApi = JavaUtils.memoize(() ->
MessageStreamImpl.newInstance(this, properties));
this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
- this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder()
- .setClientId(clientId)
- .setRaftGroupId(groupId)
+ this.dataStreamApi = JavaUtils.memoize(() ->
DataStreamClient.newBuilder(this)
.setDataStreamServer(primaryDataStreamServer)
.setProperties(properties)
.setParameters(parameters)
@@ -182,7 +180,8 @@ public final class RaftClientImpl implements RaftClient {
return leaderId;
}
- RaftGroupId getGroupId() {
+ @Override
+ public RaftGroupId getGroupId() {
return groupId;
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 81c3aa1b..ddef3ac9 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -19,7 +19,6 @@
package org.apache.ratis.netty.server;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
@@ -324,14 +323,12 @@ public class DataStreamManagement {
}
static DataStreamReplyByteBuffer
newDataStreamReplyByteBuffer(DataStreamRequestByteBuf request,
- RaftClientReply reply, long bytesWritten, Collection<CommitInfoProto>
commitInfos) {
+ RaftClientReply reply) {
final ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
return DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
.setBuffer(buffer)
.setSuccess(reply.isSuccess())
- .setBytesWritten(bytesWritten)
- .setCommitInfos(commitInfos)
.build();
}
@@ -349,28 +346,6 @@ public class DataStreamManagement {
ctx.writeAndFlush(builder.build());
}
- private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info,
DataStreamRequestByteBuf request,
- long bytesWritten, ChannelHandlerContext ctx) {
- final RequestMetrics metrics =
getMetrics().newRequestMetrics(RequestType.START_TRANSACTION);
- final RequestContext context = metrics.start();
- try {
- AsyncRpcApi asyncRpcApi = (AsyncRpcApi)
(server.getDivision(info.getRequest()
- .getRaftGroupId())
- .getRaftClient()
- .async());
- return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply,
e) -> {
- metrics.stop(context, e == null);
- if (e != null) {
- replyDataStreamException(server, e, info.getRequest(), request, ctx);
- } else {
- ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply,
bytesWritten, info.getCommitInfos()));
- }
- }, requestExecutor);
- } catch (IOException e) {
- throw new CompletionException(e);
- }
- }
-
static void replyDataStreamException(RaftServer server, Throwable cause,
RaftClientRequest raftClientRequest,
DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
final RaftClientReply reply = RaftClientReply.newBuilder()
@@ -394,7 +369,7 @@ public class DataStreamManagement {
ChannelHandlerContext ctx) {
LOG.warn("Failed to process {}", request, throwable);
try {
- ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0, null));
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} catch (Throwable t) {
LOG.warn("Failed to sendDataStreamException {} for {}", throwable,
request, t);
}
@@ -452,15 +427,9 @@ public class DataStreamManagement {
composeAsync(info.getPrevious(), requestExecutor, n ->
JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
if (request.getType() == Type.STREAM_HEADER
- || (request.getType() == Type.STREAM_DATA && !close)) {
+ || request.getType() == Type.STREAM_DATA
+ || close) {
sendReply(remoteWrites, request, bytesWritten,
info.getCommitInfos(), ctx);
- } else if (close) {
- if (info.isPrimary()) {
- // after all server close stream, primary server start
transaction
- startTransaction(info, request, bytesWritten, ctx);
- } else {
- sendReply(remoteWrites, request, bytesWritten,
info.getCommitInfos(), ctx);
- }
} else {
throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
}