szetszwo commented on code in PR #1007:
URL: https://github.com/apache/ratis/pull/1007#discussion_r1450731169
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -39,25 +42,32 @@ private static class PendingStream {
private final ClientInvocationId key;
private long nextId = -1;
private ByteString bytes = ByteString.EMPTY;
+ private List<ReferenceCountedObject<Message>> pendingRefs = new
LinkedList<>();
PendingStream(ClientInvocationId key) {
this.key = key;
}
- synchronized CompletableFuture<ByteString> append(long messageId, Message
message) {
+ synchronized CompletableFuture<ReferenceCountedObject<ByteString>>
append(long messageId,
+ ReferenceCountedObject<Message> messageRef) {
if (nextId == -1) {
nextId = messageId;
} else if (messageId != nextId) {
return JavaUtils.completeExceptionally(new StreamException(
"Unexpected message id in " + key + ": messageId = " + messageId +
" != nextId = " + nextId));
}
nextId++;
+ Message message = messageRef.retain();
+ pendingRefs.add(messageRef);
bytes = bytes.concat(message.getContent());
- return CompletableFuture.completedFuture(bytes);
+
+ ReferenceCountedObject<ByteString> bytesRef =
ReferenceCountedObject.delegateFrom(pendingRefs, bytes);
+ return CompletableFuture.completedFuture(bytesRef);
}
- synchronized CompletableFuture<ByteString> getBytes(long messageId,
Message message) {
- return append(messageId, message);
+ synchronized CompletableFuture<ReferenceCountedObject<ByteString>>
getBytes(long messageId,
+ ReferenceCountedObject<Message> messageRef) {
+ return append(messageId, messageRef);
}
Review Comment:
Make it return
`CompletableFuture<ReferenceCountedObject<RaftClientRequest>>`. The code will
become simpler
```java
synchronized
CompletableFuture<ReferenceCountedObject<RaftClientRequest>>
getWriteRequest(long messageId,
ReferenceCountedObject<RaftClientRequest> requestRef) {
return append(messageId, requestRef)
.thenApply(bytes ->
RaftClientRequest.toWriteRequest(requestRef.get(), () -> bytes))
.thenApply(write ->
ReferenceCountedObject.delegateFrom(pendingRefs, write));
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -39,25 +42,32 @@ private static class PendingStream {
private final ClientInvocationId key;
private long nextId = -1;
private ByteString bytes = ByteString.EMPTY;
+ private List<ReferenceCountedObject<Message>> pendingRefs = new
LinkedList<>();
PendingStream(ClientInvocationId key) {
this.key = key;
}
- synchronized CompletableFuture<ByteString> append(long messageId, Message
message) {
+ synchronized CompletableFuture<ReferenceCountedObject<ByteString>>
append(long messageId,
+ ReferenceCountedObject<Message> messageRef) {
Review Comment:
Pass `ReferenceCountedObject<RaftClientRequest> requestRef` instead.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -534,15 +535,21 @@ PendingRequest addPendingRequest(PendingRequests.Permit
permit, RaftClientReques
return pendingRequests.add(permit, request, entry);
}
- CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
- return messageStreamRequests.streamAsync(request)
+ CompletableFuture<RaftClientReply>
streamAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
+ RaftClientRequest request = requestRef.get();
+ return messageStreamRequests.streamAsync(requestRef)
.thenApply(dummy -> server.newSuccessReply(request))
.exceptionally(e -> exception2RaftClientReply(request, e));
}
- CompletableFuture<RaftClientRequest>
streamEndOfRequestAsync(RaftClientRequest request) {
- return messageStreamRequests.streamEndOfRequestAsync(request)
- .thenApply(bytes -> RaftClientRequest.toWriteRequest(request,
Message.valueOf(bytes)));
+ CompletableFuture<ReferenceCountedObject<RaftClientRequest>>
streamEndOfRequestAsync(
+ ReferenceCountedObject<RaftClientRequest> requestRef) {
+ RaftClientRequest request = requestRef.get();
+ return messageStreamRequests.streamEndOfRequestAsync(requestRef)
+ .thenApply(bytesRef -> {
+ RaftClientRequest finalRequest =
RaftClientRequest.toWriteRequest(request, Message.valueOf(bytesRef.get()));
+ return bytesRef.delegate(finalRequest);
+ });
Review Comment:
The code here becomes trivial.
```java
return messageStreamRequests.streamEndOfRequestAsync(requestRef);
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -39,25 +42,32 @@ private static class PendingStream {
private final ClientInvocationId key;
private long nextId = -1;
private ByteString bytes = ByteString.EMPTY;
+ private List<ReferenceCountedObject<Message>> pendingRefs = new
LinkedList<>();
Review Comment:
Store `RaftClientRequest` instead.
```java
private final List<ReferenceCountedObject<RaftClientRequest>>
pendingRefs = new LinkedList<>();
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -101,7 +114,7 @@ CompletableFuture<ByteString>
streamEndOfRequestAsync(RaftClientRequest request)
if (pending == null) {
return JavaUtils.completeExceptionally(new StreamException(name + ": " +
key + " not found"));
}
- return pending.getBytes(stream.getMessageId(), request.getMessage());
+ return pending.getBytes(stream.getMessageId(),
requestRef.delegate(request.getMessage()));
Review Comment:
Then, we don't have to delegate here.
```java
CompletableFuture<ReferenceCountedObject<RaftClientRequest>>
streamEndOfRequestAsync(
...
return pending.getWriteRequest(stream.getMessageId(), requestRef);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]