This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 58e89cb7e RATIS-1853. When the stream server channelInactive, all
requests in the channel. (#889)
58e89cb7e is described below
commit 58e89cb7e1d50fb00f5798bba419f07d9aee4f0b
Author: hao guo <[email protected]>
AuthorDate: Sat Jul 1 13:16:26 2023 +0800
RATIS-1853. When the stream server channelInactive, all requests in the
channel. (#889)
---
.../ratis/netty/server/DataStreamManagement.java | 49 +++++++++++++++++++++-
.../ratis/netty/server/NettyServerStreamRpc.java | 12 +-----
2 files changed, 48 insertions(+), 13 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index fdd6c502e..a4cc537dd 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -50,11 +50,14 @@ import
org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +67,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -224,10 +228,31 @@ public class DataStreamManagement {
}
}
+ public static class ChannelMap {
+ private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>>
map = new ConcurrentHashMap<>();
+
+ public void add(ChannelId channelId,
+ ClientInvocationId clientInvocationId) {
+ map.computeIfAbsent(channelId, (e) -> new
ConcurrentHashMap<>()).put(clientInvocationId, clientInvocationId);
+ }
+
+ public void remove(ChannelId channelId,
+ ClientInvocationId clientInvocationId) {
+ Optional.ofNullable(map.get(channelId)).ifPresent((ids) ->
ids.remove(clientInvocationId));
+ }
+
+ public Set<ClientInvocationId> remove(ChannelId channelId) {
+ return Optional.ofNullable(map.remove(channelId))
+ .map(Map::keySet)
+ .orElse(Collections.emptySet());
+ }
+ }
+
private final RaftServer server;
private final String name;
private final StreamMap streams = new StreamMap();
+ private final ChannelMap channels;
private final Executor requestExecutor;
private final Executor writeExecutor;
@@ -237,6 +262,7 @@ public class DataStreamManagement {
this.server = server;
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
+ this.channels = new ChannelMap();
final RaftProperties properties = server.getProperties();
final boolean useCachedThreadPool =
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
this.requestExecutor =
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
@@ -383,8 +409,21 @@ public class DataStreamManagement {
}
}
- void cleanUpOnChannelInactive(ClientInvocationId key) {
- Optional.ofNullable(streams.remove(key)).ifPresent(removed ->
removed.getLocal().cleanUp());
+ void cleanUp(Set<ClientInvocationId> ids) {
+ for (ClientInvocationId clientInvocationId : ids) {
+ Optional.ofNullable(streams.remove(clientInvocationId))
+ .map(StreamInfo::getLocal)
+ .ifPresent(LocalStream::cleanUp);
+ }
+ }
+
+ void cleanUpOnChannelInactive(ChannelId channelId, TimeDuration
channelInactiveGracePeriod) {
+ // Delayed memory garbage cleanup
+ Optional.ofNullable(channels.remove(channelId)).ifPresent(ids -> {
+ LOG.info("Channel {} is inactive, cleanup clientInvocationIds={}",
channelId, ids);
+ TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod, ()
-> cleanUp(ids),
+ LOG, () -> "Timeout check failed, clientInvocationIds=" + ids);
+ });
}
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
@@ -401,6 +440,11 @@ public class DataStreamManagement {
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
final boolean close =
request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
ClientInvocationId key =
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
+
+ // add to ChannelMap
+ final ChannelId channelId = ctx.channel().id();
+ channels.add(channelId, key);
+
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
@@ -452,6 +496,7 @@ public class DataStreamManagement {
}
} finally {
request.release();
+ channels.remove(channelId, key);
}
});
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 6e1c88c72..8cb34d897 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -28,7 +28,6 @@ import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -60,7 +59,6 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -244,15 +242,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- // Delayed memory garbage cleanup
- Optional.ofNullable(requestRef.getAndSetNull()).ifPresent(request -> {
- ClientInvocationId clientInvocationId = ClientInvocationId
- .valueOf(request.getClientId(), request.getStreamId());
- TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod,
- () -> requests.cleanUpOnChannelInactive(clientInvocationId),
- LOG, () -> "Timeout check failed, clientInvocationId=" +
- clientInvocationId);
- });
+ requests.cleanUpOnChannelInactive(ctx.channel().id(),
channelInactiveGracePeriod);
}
@Override