szetszwo commented on code in PR #1179:
URL: https://github.com/apache/ratis/pull/1179#discussion_r1852553011
##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -390,8 +390,13 @@ static void sendDataStreamException(Throwable throwable,
DataStreamRequestByteBu
void cleanUp(Set<ClientInvocationId> ids) {
for (ClientInvocationId clientInvocationId : ids) {
Optional.ofNullable(streams.remove(clientInvocationId))
- .map(StreamInfo::getLocal)
- .ifPresent(LocalStream::cleanUp);
+ .ifPresent(streamInfo -> {
+ streamInfo.getDivision()
+ .getDataStreamMap()
+ .remove(clientInvocationId);
+ streamInfo.getLocal().cleanUp();
+ streamInfo.applyToRemotes(out -> out.out.closeAsync());
+ });
Review Comment:
Let's use the `removeDataStream` method here.
BTW, we should update it to not passing `StreamInfo` and then check if the
removed steam is the same. If not, call `cleanUp(..)`. See below.
```java
- private void removeDataStream(ClientInvocationId invocationId, StreamInfo
info) {
+ private StreamInfo removeDataStream(ClientInvocationId invocationId) {
final StreamInfo removed = streams.remove(invocationId);
- if (info == null) {
- info = removed;
- }
- if (info != null) {
- info.getDivision().getDataStreamMap().remove(invocationId);
- info.getLocal().cleanUp();
+ if (removed != null) {
+ removed.cleanUp(invocationId);
}
+ return removed;
}
private void readImpl(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx,
@@ -479,7 +480,10 @@ public class DataStreamManagement {
try {
if (exception != null) {
replyDataStreamException(server, exception, info.getRequest(),
request, ctx);
- removeDataStream(key, info);
+ final StreamInfo removed = removeDataStream(key);
+ if (removed != info) {
+ info.cleanUp(key);
+ }
}
} finally {
request.release();
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -423,6 +428,7 @@ private void removeDataStream(ClientInvocationId
invocationId, StreamInfo info)
if (info != null) {
info.getDivision().getDataStreamMap().remove(invocationId);
info.getLocal().cleanUp();
+ info.applyToRemotes(out -> out.out.closeAsync());
Review Comment:
Let's add a `cleanUp()` method to `StreamInfo`.
```java
void cleanUp(ClientInvocationId invocationId) {
getDivision().getDataStreamMap().remove(invocationId);
getLocal().cleanUp();
applyToRemotes(remote -> remote.out.closeAsync());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]