This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch release-3.1.3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 8c7fbb6c03f9249244a38e4d4a6143bf8f5e5f69 Author: Symious <[email protected]> AuthorDate: Thu Dec 26 02:57:36 2024 +0800 RATIS-2197. Clean remote stream to resolve direct memory leak (#1179) --- .../ratis/client/impl/DataStreamClientImpl.java | 3 ++ .../ratis/netty/server/DataStreamManagement.java | 35 ++++++++++++++-------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java index ba91866d7..313131cbd 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java @@ -153,6 +153,9 @@ public class DataStreamClientImpl implements DataStreamClient { private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, Iterable<WriteOption> options) { if (isClosed()) { + if (data instanceof ByteBuf) { + ((ByteBuf) data).release(); + } return JavaUtils.completeExceptionally(new AlreadyClosedException( clientId + ": stream already closed, request=" + header)); } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 74d5cd7fd..711063185 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -211,6 +211,12 @@ public class DataStreamManagement { return Collections.emptySet(); } + + void cleanUp(ClientInvocationId invocationId) { + getDivision().getDataStreamMap().remove(invocationId); + getLocal().cleanUp(); + applyToRemotes(remote -> remote.out.closeAsync()); + } } private final RaftServer server; @@ -301,6 +307,9 @@ public class DataStreamManagement { final DataChannel channel = stream.getDataChannel(); long byteWritten = 0; for (ByteBuffer buffer : buf.nioBuffers()) { + if (buffer.remaining() == 0) { + continue; + } final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap( buffer, buf::retain, ignored -> buf.release()); try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) { @@ -389,9 +398,7 @@ public class DataStreamManagement { void cleanUp(Set<ClientInvocationId> ids) { for (ClientInvocationId clientInvocationId : ids) { - Optional.ofNullable(streams.remove(clientInvocationId)) - .map(StreamInfo::getLocal) - .ifPresent(LocalStream::cleanUp); + removeDataStream(clientInvocationId); } } @@ -411,19 +418,16 @@ public class DataStreamManagement { readImpl(request, ctx, getStreams); } catch (Throwable t) { replyDataStreamException(t, request, ctx); - removeDataStream(ClientInvocationId.valueOf(request.getClientId(), request.getStreamId()), null); + removeDataStream(ClientInvocationId.valueOf(request.getClientId(), request.getStreamId())); } } - private void removeDataStream(ClientInvocationId invocationId, StreamInfo info) { + private StreamInfo removeDataStream(ClientInvocationId invocationId) { final StreamInfo removed = streams.remove(invocationId); - if (info == null) { - info = removed; - } - if (info != null) { - info.getDivision().getDataStreamMap().remove(invocationId); - info.getLocal().cleanUp(); + if (removed != null) { + removed.cleanUp(invocationId); } + return removed; } private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, @@ -479,7 +483,14 @@ public class DataStreamManagement { try { if (exception != null) { replyDataStreamException(server, exception, info.getRequest(), request, ctx); - removeDataStream(key, info); + final StreamInfo removed = removeDataStream(key); + if (removed != null) { + Preconditions.assertSame(info, removed, "removed"); + } else { + info.cleanUp(key); + } + } else if (close) { + info.applyToRemotes(remote -> remote.out.closeAsync()); } } finally { request.release();
