This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch RATIS-1931_grpc-zero-copy
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/RATIS-1931_grpc-zero-copy by 
this push:
     new d5bd9c4bd RATIS-2414. Add leak detection for 
ZeroCopyMessageMarshaller. (#1355)
d5bd9c4bd is described below

commit d5bd9c4bdce3cb908b201819043491b6aab9bd5c
Author: slfan1989 <[email protected]>
AuthorDate: Sun Feb 22 03:16:05 2026 +0800

    RATIS-2414. Add leak detection for ZeroCopyMessageMarshaller. (#1355)
---
 .../ratis/grpc/util/ZeroCopyMessageMarshaller.java | 23 ++++++++++++++++
 .../ratis/grpc/util/GrpcZeroCopyTestServer.java    | 31 ++++++++++++++++++++--
 .../apache/ratis/grpc/util/TestGrpcZeroCopy.java   | 30 ++++++++++++++++-----
 3 files changed, 75 insertions(+), 9 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
index 3cdbc07c7..eddf2495e 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
@@ -230,4 +230,27 @@ public class ZeroCopyMessageMarshaller<T extends 
MessageLite> implements Prototy
   public int getUnclosedCount() {
     return unclosedStreams.size();
   }
+
+  void assertNoUnclosedStreams() {
+    // Intended for tests/teardown to fail fast if callers forgot to release 
streams.
+    final int size = unclosedStreams.size();
+    Preconditions.assertTrue(size == 0, () -> name + ": " + size + " unclosed 
stream(s)");
+  }
+
+  public void close() {
+    // Cleanup helper for tests/teardown; do not call while streams may still 
be in use.
+    synchronized (unclosedStreams) {
+      if (unclosedStreams.isEmpty()) {
+        return;
+      }
+      for (InputStream stream : unclosedStreams.values()) {
+        try {
+          stream.close();
+        } catch (IOException e) {
+          LOG.warn("{}: Failed to close leaked stream.", name, e);
+        }
+      }
+      unclosedStreams.clear();
+    }
+  }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
index 77e234b3e..af7991a41 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
@@ -78,6 +78,8 @@ class GrpcZeroCopyTestServer implements Closeable {
   private final Count releasedCount = new Count();
 
   private final Server server;
+  // Allow tests to disable release to validate leak detection.
+  private final boolean releaseRequests;
   private final ZeroCopyMessageMarshaller<BinaryRequest> marshaller = new 
ZeroCopyMessageMarshaller<>(
       BinaryRequest.getDefaultInstance(),
       zeroCopyCount::inc,
@@ -85,6 +87,11 @@ class GrpcZeroCopyTestServer implements Closeable {
       releasedCount::inc);
 
   GrpcZeroCopyTestServer(int port) {
+    this(port, true);
+  }
+
+  GrpcZeroCopyTestServer(int port, boolean releaseRequests) {
+    this.releaseRequests = releaseRequests;
     final GreeterImpl greeter = new GreeterImpl();
     final MethodDescriptor<BinaryRequest, BinaryReply> binary = 
GreeterGrpc.getBinaryMethod();
     final String binaryFullMethodName = binary.getFullMethodName();
@@ -130,11 +137,29 @@ class GrpcZeroCopyTestServer implements Closeable {
 
   @Override
   public void close() throws IOException {
+    // Shutdown server first, then assert no leaked streams and cleanup if 
needed.
+    IOException ioe = null;
     try {
       server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw IOUtils.toInterruptedIOException("Failed to close", e);
+      ioe = IOUtils.toInterruptedIOException("Failed to close", e);
+    }
+
+    try {
+      marshaller.assertNoUnclosedStreams();
+    } catch (RuntimeException e) {
+      if (ioe != null) {
+        ioe.addSuppressed(e);
+        throw ioe;
+      }
+      throw e;
+    } finally {
+      marshaller.close();
+    }
+
+    if (ioe != null) {
+      throw ioe;
     }
   }
 
@@ -181,7 +206,9 @@ class GrpcZeroCopyTestServer implements Closeable {
             ByteBuffer.wrap(bytes).putInt(data.size());
             
responseObserver.onNext(BinaryReply.newBuilder().setData(UnsafeByteOperations.unsafeWrap(bytes)).build());
           } finally {
-            marshaller.release(request);
+            if (releaseRequests) {
+              marshaller.release(request);
+            }
           }
         }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java
index 9ffe7ecd0..dde44e579 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java
@@ -26,9 +26,13 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.TraditionalBinaryPrefix;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -60,8 +64,8 @@ public final class TestGrpcZeroCopy extends BaseTest {
         RANDOM.nextBytes(ARRAY);
         final ByteString expected = UnsafeByteOperations.unsafeWrap(ARRAY, 0, 
remaining);
         final ByteString computed = b.substring(offset, offset + remaining);
-        Assertions.assertEquals(expected.size(), computed.size());
-        Assertions.assertEquals(expected, computed);
+        assertEquals(expected.size(), computed.size());
+        assertEquals(expected, computed);
         offset += remaining;
       }
     }
@@ -99,7 +103,7 @@ public final class TestGrpcZeroCopy extends BaseTest {
   /** Test a zero-copy marshaller is available from the versions of gRPC and 
Protobuf. */
   @Test
   public void testReadiness() {
-    Assertions.assertTrue(isReady());
+    assertTrue(isReady());
   }
 
 
@@ -108,6 +112,18 @@ public final class TestGrpcZeroCopy extends BaseTest {
     runTestZeroCopy();
   }
 
+  @Test
+  public void testLeakCheck() throws Exception {
+    // Verify leak detection by disabling release on the server side.
+    assumeTrue(isReady());
+    final GrpcZeroCopyTestServer server = new 
GrpcZeroCopyTestServer(NetUtils.getFreePort(), false);
+    final int port = server.start();
+    try (GrpcZeroCopyTestClient client = new 
GrpcZeroCopyTestClient(NetUtils.LOCALHOST, port)) {
+      sendBinaries(1, client, server);
+    }
+    assertThrows(IllegalStateException.class, server::close);
+  }
+
   void runTestZeroCopy() throws Exception {
     try (GrpcZeroCopyTestServer server = new 
GrpcZeroCopyTestServer(NetUtils.getFreePort())) {
       final int port = server.start();
@@ -134,7 +150,7 @@ public final class TestGrpcZeroCopy extends BaseTest {
     for (int i = 0; i < futures.size(); i++) {
       final String expected = GrpcZeroCopyTestServer.toReply(i, 
messages.get(i));
       final String reply = futures.get(i).get();
-      Assertions.assertEquals(expected, reply, "expected = " + expected + " != 
reply = " + reply);
+      assertEquals(expected, reply, "expected = " + expected + " != reply = " 
+ reply);
       server.assertCounts(numElements, numBytes);
     }
   }
@@ -159,8 +175,8 @@ public final class TestGrpcZeroCopy extends BaseTest {
       }
 
       final ByteString reply = future.get();
-      Assertions.assertEquals(4, reply.size());
-      Assertions.assertEquals(size, reply.asReadOnlyByteBuffer().getInt());
+      assertEquals(4, reply.size());
+      assertEquals(size, reply.asReadOnlyByteBuffer().getInt());
 
       numElements++;
       numBytes += size;

Reply via email to