This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 117d03abe RATIS-2197. Clean remote stream to resolve direct memory
leak (#1179)
117d03abe is described below
commit 117d03abeea7092f47c29654acd5aee8cf4a9488
Author: Symious <[email protected]>
AuthorDate: Thu Dec 26 02:57:36 2024 +0800
RATIS-2197. Clean remote stream to resolve direct memory leak (#1179)
---
.../ratis/client/impl/DataStreamClientImpl.java | 3 ++
.../ratis/netty/server/DataStreamManagement.java | 35 ++++++++++++++--------
2 files changed, 26 insertions(+), 12 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index ba91866d7..313131cbd 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -153,6 +153,9 @@ public class DataStreamClientImpl implements
DataStreamClient {
private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data,
long length, Iterable<WriteOption> options) {
if (isClosed()) {
+ if (data instanceof ByteBuf) {
+ ((ByteBuf) data).release();
+ }
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 74d5cd7fd..711063185 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -211,6 +211,12 @@ public class DataStreamManagement {
return Collections.emptySet();
}
+
+ void cleanUp(ClientInvocationId invocationId) {
+ getDivision().getDataStreamMap().remove(invocationId);
+ getLocal().cleanUp();
+ applyToRemotes(remote -> remote.out.closeAsync());
+ }
}
private final RaftServer server;
@@ -301,6 +307,9 @@ public class DataStreamManagement {
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
+ if (buffer.remaining() == 0) {
+ continue;
+ }
final ReferenceCountedObject<ByteBuffer> wrapped =
ReferenceCountedObject.wrap(
buffer, buf::retain, ignored -> buf.release());
try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
@@ -389,9 +398,7 @@ public class DataStreamManagement {
void cleanUp(Set<ClientInvocationId> ids) {
for (ClientInvocationId clientInvocationId : ids) {
- Optional.ofNullable(streams.remove(clientInvocationId))
- .map(StreamInfo::getLocal)
- .ifPresent(LocalStream::cleanUp);
+ removeDataStream(clientInvocationId);
}
}
@@ -411,19 +418,16 @@ public class DataStreamManagement {
readImpl(request, ctx, getStreams);
} catch (Throwable t) {
replyDataStreamException(t, request, ctx);
- removeDataStream(ClientInvocationId.valueOf(request.getClientId(),
request.getStreamId()), null);
+ removeDataStream(ClientInvocationId.valueOf(request.getClientId(),
request.getStreamId()));
}
}
- private void removeDataStream(ClientInvocationId invocationId, StreamInfo
info) {
+ private StreamInfo removeDataStream(ClientInvocationId invocationId) {
final StreamInfo removed = streams.remove(invocationId);
- if (info == null) {
- info = removed;
- }
- if (info != null) {
- info.getDivision().getDataStreamMap().remove(invocationId);
- info.getLocal().cleanUp();
+ if (removed != null) {
+ removed.cleanUp(invocationId);
}
+ return removed;
}
private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx,
@@ -479,7 +483,14 @@ public class DataStreamManagement {
try {
if (exception != null) {
replyDataStreamException(server, exception, info.getRequest(),
request, ctx);
- removeDataStream(key, info);
+ final StreamInfo removed = removeDataStream(key);
+ if (removed != null) {
+ Preconditions.assertSame(info, removed, "removed");
+ } else {
+ info.cleanUp(key);
+ }
+ } else if (close) {
+ info.applyToRemotes(remote -> remote.out.closeAsync());
}
} finally {
request.release();