This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new fb062dd RATIS-1456. Clean StreamMap when an exception occurs in
DataStreamManagement#read (#551)
fb062dd is described below
commit fb062dd90146ad54c5ab4d6f459c05d20b7f0322
Author: hao guo <[email protected]>
AuthorDate: Fri Dec 3 10:36:57 2021 +0800
RATIS-1456. Clean StreamMap when an exception occurs in
DataStreamManagement#read (#551)
---
.../apache/ratis/netty/server/DataStreamManagement.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 7539245..98e9c9e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -378,6 +378,7 @@ public class DataStreamManagement {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() ->
newStreamInfo(buf, getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
+ streams.remove(key);
throw new IllegalStateException("Failed to create a new stream for " +
request
+ " since a stream already exists Key: " + key + " StreamInfo:" +
info);
}
@@ -386,7 +387,10 @@ public class DataStreamManagement {
() -> new IllegalStateException("Failed to remove StreamInfo for " +
request));
} else {
info = Optional.ofNullable(streams.get(key)).orElseThrow(
- () -> new IllegalStateException("Failed to get StreamInfo for " +
request));
+ () -> {
+ streams.remove(key);
+ return new IllegalStateException("Failed to get StreamInfo for " +
request);
+ });
}
final CompletableFuture<Long> localWrite;
@@ -420,6 +424,7 @@ public class DataStreamManagement {
}, requestExecutor)).whenComplete((v, exception) -> {
try {
if (exception != null) {
+ streams.remove(key);
replyDataStreamException(server, exception, info.getRequest(),
request, ctx);
}
} finally {
@@ -441,7 +446,14 @@ public class DataStreamManagement {
for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
final DataStreamReply reply = replyFuture.join();
assertReplyCorrespondingToRequest(request, reply);
- if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
+ if (!reply.isSuccess()) {
+ LOG.warn("reply is not success, request: {}", request);
+ return false;
+ }
+ if (reply.getBytesWritten() != bytesWritten) {
+ LOG.warn(
+ "reply written bytes not match, local size: {} remote size: {}
request: {}",
+ bytesWritten, reply.getBytesWritten(), request);
return false;
}
}