This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch RATIS-1931_grpc-zero-copy
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/RATIS-1931_grpc-zero-copy by
this push:
new d5bd9c4bd RATIS-2414. Add leak detection for
ZeroCopyMessageMarshaller. (#1355)
d5bd9c4bd is described below
commit d5bd9c4bdce3cb908b201819043491b6aab9bd5c
Author: slfan1989 <[email protected]>
AuthorDate: Sun Feb 22 03:16:05 2026 +0800
RATIS-2414. Add leak detection for ZeroCopyMessageMarshaller. (#1355)
---
.../ratis/grpc/util/ZeroCopyMessageMarshaller.java | 23 ++++++++++++++++
.../ratis/grpc/util/GrpcZeroCopyTestServer.java | 31 ++++++++++++++++++++--
.../apache/ratis/grpc/util/TestGrpcZeroCopy.java | 30 ++++++++++++++++-----
3 files changed, 75 insertions(+), 9 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
index 3cdbc07c7..eddf2495e 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java
@@ -230,4 +230,27 @@ public class ZeroCopyMessageMarshaller<T extends
MessageLite> implements Prototy
public int getUnclosedCount() {
return unclosedStreams.size();
}
+
+ void assertNoUnclosedStreams() {
+ // Intended for tests/teardown to fail fast if callers forgot to release
streams.
+ final int size = unclosedStreams.size();
+ Preconditions.assertTrue(size == 0, () -> name + ": " + size + " unclosed
stream(s)");
+ }
+
+ public void close() {
+ // Cleanup helper for tests/teardown; do not call while streams may still
be in use.
+ synchronized (unclosedStreams) {
+ if (unclosedStreams.isEmpty()) {
+ return;
+ }
+ for (InputStream stream : unclosedStreams.values()) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ LOG.warn("{}: Failed to close leaked stream.", name, e);
+ }
+ }
+ unclosedStreams.clear();
+ }
+ }
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
index 77e234b3e..af7991a41 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestServer.java
@@ -78,6 +78,8 @@ class GrpcZeroCopyTestServer implements Closeable {
private final Count releasedCount = new Count();
private final Server server;
+ // Allow tests to disable release to validate leak detection.
+ private final boolean releaseRequests;
private final ZeroCopyMessageMarshaller<BinaryRequest> marshaller = new
ZeroCopyMessageMarshaller<>(
BinaryRequest.getDefaultInstance(),
zeroCopyCount::inc,
@@ -85,6 +87,11 @@ class GrpcZeroCopyTestServer implements Closeable {
releasedCount::inc);
GrpcZeroCopyTestServer(int port) {
+ this(port, true);
+ }
+
+ GrpcZeroCopyTestServer(int port, boolean releaseRequests) {
+ this.releaseRequests = releaseRequests;
final GreeterImpl greeter = new GreeterImpl();
final MethodDescriptor<BinaryRequest, BinaryReply> binary =
GreeterGrpc.getBinaryMethod();
final String binaryFullMethodName = binary.getFullMethodName();
@@ -130,11 +137,29 @@ class GrpcZeroCopyTestServer implements Closeable {
@Override
public void close() throws IOException {
+ // Shutdown server first, then assert no leaked streams and cleanup if
needed.
+ IOException ioe = null;
try {
server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw IOUtils.toInterruptedIOException("Failed to close", e);
+ ioe = IOUtils.toInterruptedIOException("Failed to close", e);
+ }
+
+ try {
+ marshaller.assertNoUnclosedStreams();
+ } catch (RuntimeException e) {
+ if (ioe != null) {
+ ioe.addSuppressed(e);
+ throw ioe;
+ }
+ throw e;
+ } finally {
+ marshaller.close();
+ }
+
+ if (ioe != null) {
+ throw ioe;
}
}
@@ -181,7 +206,9 @@ class GrpcZeroCopyTestServer implements Closeable {
ByteBuffer.wrap(bytes).putInt(data.size());
responseObserver.onNext(BinaryReply.newBuilder().setData(UnsafeByteOperations.unsafeWrap(bytes)).build());
} finally {
- marshaller.release(request);
+ if (releaseRequests) {
+ marshaller.release(request);
+ }
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java
index 9ffe7ecd0..dde44e579 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestGrpcZeroCopy.java
@@ -26,9 +26,13 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TraditionalBinaryPrefix;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -60,8 +64,8 @@ public final class TestGrpcZeroCopy extends BaseTest {
RANDOM.nextBytes(ARRAY);
final ByteString expected = UnsafeByteOperations.unsafeWrap(ARRAY, 0,
remaining);
final ByteString computed = b.substring(offset, offset + remaining);
- Assertions.assertEquals(expected.size(), computed.size());
- Assertions.assertEquals(expected, computed);
+ assertEquals(expected.size(), computed.size());
+ assertEquals(expected, computed);
offset += remaining;
}
}
@@ -99,7 +103,7 @@ public final class TestGrpcZeroCopy extends BaseTest {
/** Test a zero-copy marshaller is available from the versions of gRPC and
Protobuf. */
@Test
public void testReadiness() {
- Assertions.assertTrue(isReady());
+ assertTrue(isReady());
}
@@ -108,6 +112,18 @@ public final class TestGrpcZeroCopy extends BaseTest {
runTestZeroCopy();
}
+ @Test
+ public void testLeakCheck() throws Exception {
+ // Verify leak detection by disabling release on the server side.
+ assumeTrue(isReady());
+ final GrpcZeroCopyTestServer server = new
GrpcZeroCopyTestServer(NetUtils.getFreePort(), false);
+ final int port = server.start();
+ try (GrpcZeroCopyTestClient client = new
GrpcZeroCopyTestClient(NetUtils.LOCALHOST, port)) {
+ sendBinaries(1, client, server);
+ }
+ assertThrows(IllegalStateException.class, server::close);
+ }
+
void runTestZeroCopy() throws Exception {
try (GrpcZeroCopyTestServer server = new
GrpcZeroCopyTestServer(NetUtils.getFreePort())) {
final int port = server.start();
@@ -134,7 +150,7 @@ public final class TestGrpcZeroCopy extends BaseTest {
for (int i = 0; i < futures.size(); i++) {
final String expected = GrpcZeroCopyTestServer.toReply(i,
messages.get(i));
final String reply = futures.get(i).get();
- Assertions.assertEquals(expected, reply, "expected = " + expected + " !=
reply = " + reply);
+ assertEquals(expected, reply, "expected = " + expected + " != reply = "
+ reply);
server.assertCounts(numElements, numBytes);
}
}
@@ -159,8 +175,8 @@ public final class TestGrpcZeroCopy extends BaseTest {
}
final ByteString reply = future.get();
- Assertions.assertEquals(4, reply.size());
- Assertions.assertEquals(size, reply.asReadOnlyByteBuffer().getInt());
+ assertEquals(4, reply.size());
+ assertEquals(size, reply.asReadOnlyByteBuffer().getInt());
numElements++;
numBytes += size;