szetszwo commented on code in PR #1179:
URL: https://github.com/apache/ratis/pull/1179#discussion_r1855522106
##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -390,8 +396,7 @@ static void sendDataStreamException(Throwable throwable,
DataStreamRequestByteBu
void cleanUp(Set<ClientInvocationId> ids) {
for (ClientInvocationId clientInvocationId : ids) {
Optional.ofNullable(streams.remove(clientInvocationId))
- .map(StreamInfo::getLocal)
- .ifPresent(LocalStream::cleanUp);
+ .ifPresent(streamInfo -> streamInfo.cleanUp(clientInvocationId));
Review Comment:
We should call `removeDataStream` here, i.e.
```diff
void cleanUp(Set<ClientInvocationId> ids) {
for (ClientInvocationId clientInvocationId : ids) {
- Optional.ofNullable(streams.remove(clientInvocationId))
- .map(StreamInfo::getLocal)
- .ifPresent(LocalStream::cleanUp);
+ removeDataStream(clientInvocationId);
}
}
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -479,7 +485,12 @@ private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ct
try {
if (exception != null) {
replyDataStreamException(server, exception, info.getRequest(),
request, ctx);
- removeDataStream(key, info);
+ }
+ if (close || exception != null) {
Review Comment:
For normal close case,
- the local stream (to a local file) is a `DataStream` created by an
application. It should be removed from the `DataStreamMap` and closed during
`link(..)`.
- Remote streams (server to server) should be closed here. This is the leak.
To play safe, call `info.cleanUp(key)` when `removed == null`. The code
becomes
```java
if (exception != null) {
replyDataStreamException(server, exception, info.getRequest(),
request, ctx);
final StreamInfo removed = removeDataStream(key);
if (removed != null) {
Preconditions.assertSame(info, removed, "removed");
} else {
info.cleanUp(key);
}
} else if (close) {
info.applyToRemotes(remote -> remote.out.closeAsync());
}
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -459,7 +461,11 @@ private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ct
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
- localWrite = info.getLocal().write(request.slice(),
request.getWriteOptionList(), writeExecutor);
+ if (close && request.getDataLength() == 0) {
+ localWrite = CompletableFuture.completedFuture(0L);
+ } else {
+ localWrite = info.getLocal().write(request.slice(),
request.getWriteOptionList(), writeExecutor);
+ }
Review Comment:
This part is an optimization for not calling `getLocal().write(..)`.
However, `getLocal().write(..)` also takes care the `WriteOption`s such as
`SYNC` and `FLUSH`. So, we cannot skip it here. We could skip it later on in
```java
//writeTo(..)
@@ -301,6 +307,9 @@ public class DataStreamManagement {
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
+ if (buffer.remaining() == 0) {
+ continue;
+ }
final ReferenceCountedObject<ByteBuffer> wrapped =
ReferenceCountedObject.wrap(
buffer, buf::retain, ignored -> buf.release());
try(UncheckedAutoCloseable ignore =
wrapped.retainAndReleaseOnClose()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]