szetszwo commented on code in PR #1021:
URL: https://github.com/apache/ratis/pull/1021#discussion_r1458122562
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java:
##########
@@ -64,7 +64,11 @@ private static class PendingOrderedRequest implements
SlidingWindow.ServerSideRe
PendingOrderedRequest(ReferenceCountedObject<RaftClientRequest>
requestRef) {
this.requestRef = requestRef;
- this.request = requestRef != null ? requestRef.get() : null;
+ this.request = requestRef != null ? requestRef.retain() : null;
+ }
+
+ void release() {
+ requestRef.release();
}
Review Comment:
This method could be removed.
##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java:
##########
@@ -128,9 +128,11 @@ <OUTPUT, THROWABLE extends Throwable> OUTPUT
runSequentially(
* Used by the leader.
*/
default CompletableFuture<Long> appendEntry(LogEntryProto entry,
TransactionContext context) {
- return appendEntry(entry);
+ return appendEntry(ReferenceCountedObject.wrap(entry), context);
}
+ CompletableFuture<Long> appendEntry(ReferenceCountedObject<LogEntryProto>
entryRef, TransactionContext context);
+
Review Comment:
Since this is in `ratis-server-api`, we have to provide a default
implementation of the new method for compatibility.
```java
/**
* Append asynchronously an entry.
* Used by the leader.
*/
default CompletableFuture<Long>
appendEntry(ReferenceCountedObject<LogEntryProto> entryRef, TransactionContext
context) {
return appendEntry(entryRef.get(), context);
}
/**
* @deprecated use {@link #append(ReferenceCountedObject)}.
*/
@Deprecated
default CompletableFuture<Long> appendEntry(LogEntryProto entry,
TransactionContext context) {
throw new UnsupportedOperationException();
}
```
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java:
##########
@@ -363,6 +367,7 @@ void processClientRequest(PendingOrderedRequest pending) {
final long seq = pending.getSeqNum();
processClientRequest(pending.getRequestRef(),
reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
+ pending.release();
Review Comment:
We should use `whenComplete(..)`.
```java
final ReferenceCountedObject<RaftClientRequest> ref =
pending.getRequestRef();
processClientRequest(ref, reply -> slidingWindow.receiveReply(seq,
reply, this::sendReply))
.whenComplete((v, e) -> ref.release());
```
##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java:
##########
@@ -128,9 +128,11 @@ <OUTPUT, THROWABLE extends Throwable> OUTPUT
runSequentially(
* Used by the leader.
*/
default CompletableFuture<Long> appendEntry(LogEntryProto entry,
TransactionContext context) {
- return appendEntry(entry);
+ return appendEntry(ReferenceCountedObject.wrap(entry), context);
}
+ CompletableFuture<Long> appendEntry(ReferenceCountedObject<LogEntryProto>
entryRef, TransactionContext context);
+
Review Comment:
Update also `RaftLogBase`:
```java
@@ -345,15 +346,17 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
- return appendEntry(entry, null);
+ return appendEntry(ReferenceCountedObject.wrap(entry), null);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]