This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b49a79317 RATIS-1847. Stream has memory leak. (#884)
b49a79317 is described below
commit b49a79317fbc1fcb68448bd603df6f6bf9d18808
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 19 23:33:46 2023 +0800
RATIS-1847. Stream has memory leak. (#884)
---
.../ratis/netty/server/DataStreamManagement.java | 30 ++++++++++++----------
.../netty/server/DataStreamRequestByteBuf.java | 22 +++++++++++++---
2 files changed, 35 insertions(+), 17 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 362199323..b3f42bf32 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -96,6 +96,10 @@ public class DataStreamManagement {
n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options,
stream, executor)
.whenComplete((l, e) -> metrics.stop(context, e == null))));
}
+
+ void cleanUp() {
+ streamFuture.thenAccept(DataStream::cleanUp);
+ }
}
static class RemoteStream {
@@ -374,31 +378,33 @@ public class DataStreamManagement {
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} catch (Throwable t) {
LOG.warn("Failed to sendDataStreamException {} for {}", throwable,
request, t);
+ } finally {
+ request.release();
}
}
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
- final ByteBuf buf = request.slice();
try {
- readImpl(request, ctx, buf, getStreams);
+ readImpl(request, ctx, getStreams);
} catch (Throwable t) {
replyDataStreamException(t, request, ctx);
- buf.release();
}
}
- private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx, ByteBuf buf,
+ private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
final boolean close =
request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
ClientInvocationId key =
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
- final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() ->
newStreamInfo(buf, getStreams));
+ final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
+ () -> newStreamInfo(request.slice(), getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
- streams.remove(key);
+ final StreamInfo removed = streams.remove(key);
+ removed.getLocal().cleanUp();
throw new IllegalStateException("Failed to create a new stream for " +
request
+ " since a stream already exists Key: " + key + " StreamInfo:" +
info);
}
@@ -408,10 +414,7 @@ public class DataStreamManagement {
() -> new IllegalStateException("Failed to remove StreamInfo for " +
request));
} else {
info = Optional.ofNullable(streams.get(key)).orElseThrow(
- () -> {
- streams.remove(key);
- return new IllegalStateException("Failed to get StreamInfo for " +
request);
- });
+ () -> new IllegalStateException("Failed to get StreamInfo for " +
request));
}
final CompletableFuture<Long> localWrite;
@@ -420,7 +423,7 @@ public class DataStreamManagement {
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
- localWrite = info.getLocal().write(buf, request.getWriteOptionList(),
writeExecutor);
+ localWrite = info.getLocal().write(request.slice(),
request.getWriteOptionList(), writeExecutor);
remoteWrites = info.applyToRemotes(out -> out.write(request,
requestExecutor));
} else {
throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
@@ -439,11 +442,12 @@ public class DataStreamManagement {
}, requestExecutor)).whenComplete((v, exception) -> {
try {
if (exception != null) {
- streams.remove(key);
+ final StreamInfo removed = streams.remove(key);
replyDataStreamException(server, exception, info.getRequest(),
request, ctx);
+ removed.getLocal().cleanUp();
}
} finally {
- buf.release();
+ request.release();
}
});
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 0dcd46e02..2542b1ec6 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -30,6 +30,8 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Implements {@link DataStreamRequest} with {@link ByteBuf}.
@@ -37,13 +39,13 @@ import java.util.List;
* This class is immutable.
*/
public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements
DataStreamRequest {
- private final ByteBuf buf;
+ private final AtomicReference<ByteBuf> buf;
private final List<WriteOption> options;
public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId,
long streamOffset,
Iterable<WriteOption> options, ByteBuf buf) {
super(clientId, type, streamId, streamOffset);
- this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
+ this.buf = new AtomicReference<>(buf != null? buf.asReadOnly():
Unpooled.EMPTY_BUFFER);
this.options = Collections.unmodifiableList(Lists.newArrayList(options));
}
@@ -52,13 +54,25 @@ public class DataStreamRequestByteBuf extends
DataStreamPacketImpl implements Da
header.getWriteOptionList(), buf);
}
+ ByteBuf getBuf() {
+ return Optional.ofNullable(buf.get()).orElseThrow(
+ () -> new IllegalStateException("buf is already released in " + this));
+ }
+
@Override
public long getDataLength() {
- return buf.readableBytes();
+ return getBuf().readableBytes();
}
public ByteBuf slice() {
- return buf.slice();
+ return getBuf().slice();
+ }
+
+ public void release() {
+ final ByteBuf got = buf.getAndSet(null);
+ if (got != null) {
+ got.release();
+ }
}
@Override