This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e2aaddca9 RATIS-2007. Zero-copy buffers are not released (#1027)
e2aaddca9 is described below

commit e2aaddca9b850e978ee7a1634cadac76e81559c4
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Jan 24 22:54:32 2024 -0800

    RATIS-2007. Zero-copy buffers are not released (#1027)
---
 .../protocol/RaftClientAsynchronousProtocol.java   |  6 ++---
 .../java/org/apache/ratis/util/SlidingWindow.java  | 12 ++++++++--
 .../grpc/server/GrpcClientProtocolService.java     | 17 +++++++++++---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 26 +++++++++++++---------
 .../ratis/server/raftlog/segmented/LogSegment.java |  1 +
 5 files changed, 43 insertions(+), 19 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index 1985bbe66..222ccff05 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -46,11 +46,11 @@ public interface RaftClientAsynchronousProtocol {
       ReferenceCountedObject<RaftClientRequest> requestRef) {
     try {
       // for backward compatibility
-      return submitClientRequestAsync(requestRef.retain())
-          .whenComplete((r, e) -> requestRef.release());
+      return submitClientRequestAsync(requestRef.retain());
     } catch (Exception e) {
-      requestRef.release();
       return JavaUtils.completeExceptionally(e);
+    } finally {
+      requestRef.release();
     }
   }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java 
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 732e3d890..7adc7a721 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -52,6 +52,9 @@ public interface SlidingWindow {
     boolean hasReply();
 
     void fail(Throwable e);
+
+    default void release() {
+    }
   }
 
   interface ClientSideRequest<REPLY> extends Request<REPLY> {
@@ -170,8 +173,12 @@ public interface SlidingWindow {
       putNewRequest(end);
     }
 
-    void clear() {
+    void clear(long nextToProcess) {
       LOG.debug("close {}", this);
+      final SortedMap<Long, REQUEST> tail = requests.tailMap(nextToProcess);
+      for (REQUEST r : tail.values()) {
+        r.release();
+      }
       requests.clear();
     }
 
@@ -469,6 +476,7 @@ public interface SlidingWindow {
           return;
         } else if (r.getSeqNum() == nextToProcess) {
           processingMethod.accept(r);
+          r.release();
           nextToProcess++;
         }
       }
@@ -514,7 +522,7 @@ public interface SlidingWindow {
 
     @Override
     public void close() {
-      requests.clear();
+      requests.clear(nextToProcess);
     }
   }
 }
\ No newline at end of file
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 67e75d606..0671a1841 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -96,6 +96,13 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
       return requestRef;
     }
 
+    @Override
+    public void release() {
+      if (requestRef != null) {
+        requestRef.release();
+      }
+    }
+
     @Override
     public long getSeqNum() {
       return request != null? request.getSlidingWindowEntry().getSeqNum(): 
Long.MAX_VALUE;
@@ -363,7 +370,6 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
       final long seq = pending.getSeqNum();
       processClientRequest(pending.getRequestRef(),
           reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
-      pending.getRequestRef().release();
     }
 
     @Override
@@ -378,7 +384,6 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
         final RaftGroupId requestGroupId = request.getRaftGroupId();
         // use the group id in the first request as the group id of this 
observer
         final RaftGroupId updated = groupId.updateAndGet(g -> g != null ? g : 
requestGroupId);
-        final PendingOrderedRequest pending = new 
PendingOrderedRequest(requestRef);
 
         if (!requestGroupId.equals(updated)) {
           final GroupMismatchException exception = new 
GroupMismatchException(getId()
@@ -387,7 +392,13 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
           responseError(exception, () -> "processClientRequest (Group 
mismatched) for " + request);
           return;
         }
-        slidingWindow.receivedRequest(pending, this::processClientRequest);
+        final PendingOrderedRequest pending = new 
PendingOrderedRequest(requestRef);
+        try {
+          slidingWindow.receivedRequest(pending, this::processClientRequest);
+        } catch (Exception e) {
+          pending.release();
+          throw e;
+        }
       } finally {
         requestRef.release();
       }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8ad835474..396243b1c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -887,15 +887,19 @@ class RaftServerImpl implements RaftServer.Division,
       return CompletableFuture.completedFuture(reply);
     }
 
-    final Timekeeper timer = 
raftServerMetrics.getClientRequestTimer(request.getType());
-    final Optional<Timekeeper.Context> timerContext = 
Optional.ofNullable(timer).map(Timekeeper::time);
-    return replyFuture(requestRef).whenComplete((clientReply, exception) -> {
+    try {
+      RaftClientRequest.Type type = request.getType();
+      final Timekeeper timer = raftServerMetrics.getClientRequestTimer(type);
+      final Optional<Timekeeper.Context> timerContext = 
Optional.ofNullable(timer).map(Timekeeper::time);
+      return replyFuture(requestRef).whenComplete((clientReply, exception) -> {
+        timerContext.ifPresent(Timekeeper.Context::stop);
+        if (exception != null || clientReply.getException() != null) {
+          raftServerMetrics.incFailedRequestCount(type);
+        }
+      });
+    } finally {
       requestRef.release();
-      timerContext.ifPresent(Timekeeper.Context::stop);
-      if (exception != null || clientReply.getException() != null) {
-        raftServerMetrics.incFailedRequestCount(request.getType());
-      }
-    });
+    }
   }
 
   private CompletableFuture<RaftClientReply> 
replyFuture(ReferenceCountedObject<RaftClientRequest> requestRef) {
@@ -1479,12 +1483,12 @@ class RaftServerImpl implements RaftServer.Division,
       preAppendEntriesAsync(requestorId, 
ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
           previous, r.getLeaderCommit(), r.getInitializing(), entries);
       return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, 
r.getLeaderCommit(),
-          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), 
entries, requestRef)
-          .whenComplete((reply, e) -> requestRef.release());
+          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), 
entries, requestRef);
     } catch(Exception t) {
       LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
-      requestRef.release();
       throw t;
+    } finally {
+      requestRef.release();
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 0750d2cc8..68da35014 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -245,6 +245,7 @@ public final class LogSegment {
         if (ti.equals(key.getTermIndex())) {
           toReturn.set(entry);
         }
+        entryRef.release();
       });
       loadingTimes.incrementAndGet();
       return Objects.requireNonNull(toReturn.get());

Reply via email to