szetszwo commented on code in PR #1032:
URL: https://github.com/apache/ratis/pull/1032#discussion_r1469990865


##########
ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java:
##########
@@ -462,6 +463,9 @@ public synchronized void receivedRequest(REQUEST request, 
Consumer<REQUEST> proc
       } else {
         final boolean isRetry = requests.putIfAbsent(request);
         LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
+        if (isRetry || request.getSeqNum() < nextToProcess) {

Review Comment:
   If `request.getSeqNum() < nextToProcess`, the request should not be added 
then. 
   ```java
       public synchronized void receivedRequest(REQUEST request, 
Consumer<REQUEST> processingMethod) {
         final long seqNum = request.getSeqNum();
         final boolean accepted;
         if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
           nextToProcess = seqNum;
           requests.putNewRequest(request);
           LOG.debug("Received seq={} (first request), {}", seqNum, this);
           accepted = true;
         } else if (request.getSeqNum() < nextToProcess) {
           LOG.debug("Received seq={} < nextToProcess {}, {}", seqNum, 
nextToProcess, this);
           accepted = false;
         } else {
           final boolean isRetry = requests.putIfAbsent(request);
           LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
           accepted = !isRetry;
         }
   
         if (accepted) {
           processRequestsFromHead(processingMethod);
         } else {
           request.release();
         }
       }
   ```



##########
ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java:
##########
@@ -117,7 +117,13 @@ public V retain() {
 
       @Override
       public boolean release() {
-        return 
fromRefs.stream().map(ReferenceCountedObject::release).allMatch(r -> r);
+        boolean allReleased = true;
+        for (ReferenceCountedObject ref : fromRefs) {

Review Comment:
   Add type parameter:
   ```java
           for (ReferenceCountedObject<T> ref : fromRefs) {
   ```



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java:
##########
@@ -52,11 +52,13 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
 
   static class PendingServerRequest<REQUEST> {
     private final REQUEST request;
+    private final ReferenceCountedObject<REQUEST> requestRef;
+    private AtomicBoolean released = new AtomicBoolean(false);

Review Comment:
   Let's combine these three fields:
   ```java
     static class PendingServerRequest<REQUEST> {
       private final AtomicReference<ReferenceCountedObject<REQUEST>> 
requestRef;
       private final CompletableFuture<Void> future = new CompletableFuture<>();
   
       PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
         requestRef.retain();
         this.requestRef = new AtomicReference<>(requestRef);
       }
   
       REQUEST getRequest() {
         return Optional.ofNullable(requestRef.get())
             .map(ReferenceCountedObject::get)
             .orElse(null);
       }
   
       CompletableFuture<Void> getFuture() {
         return future;
       }
   
       void release() {
         Optional.ofNullable(requestRef.getAndSet(null))
             .ifPresent(ReferenceCountedObject::release);
       }
     }
   ```



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java:
##########
@@ -320,11 +320,13 @@ void 
processClientRequest(ReferenceCountedObject<RaftClientRequest> requestRef)
 
       final CompletableFuture<Void> f = processClientRequest(requestRef, reply 
-> {
         if (!reply.isSuccess()) {
-          LOG.info("Failed {}, reply={}", request, reply);
+          LOG.info("Failed {}, reply={}", request.toStringShort(), reply);

Review Comment:
   We cannot use `request` here
   ```java
         final SlidingWindowEntry slidingWindowEntry = 
request.getSlidingWindowEntry();
         final CompletableFuture<Void> f = processClientRequest(requestRef, 
reply -> {
           if (!reply.isSuccess()) {
             LOG.info("Failed request cid={}, {}, reply={}", callId, 
slidingWindowEntry, reply);
           }
           final RaftClientReplyProto proto = 
ClientProtoUtils.toRaftClientReplyProto(reply);
           responseNext(proto);
         });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to