This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8adc4ffc7 RATIS-1850. When the stream server side receives a
disconnection, memory is cleared (#887)
8adc4ffc7 is described below
commit 8adc4ffc7bd0b97c75286e3ed991dcbf3cd7907a
Author: hao guo <[email protected]>
AuthorDate: Wed Jun 21 15:49:24 2023 +0800
RATIS-1850. When the stream server side receives a disconnection, memory is
cleared (#887)
---
.../org/apache/ratis/netty/NettyConfigKeys.java | 11 +++++++++++
.../ratis/netty/server/DataStreamManagement.java | 4 ++++
.../ratis/netty/server/NettyServerStreamRpc.java | 21 +++++++++++++++++++++
3 files changed, 36 insertions(+)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 07e662162..81a381c9c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -26,6 +26,7 @@ import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.apache.ratis.conf.ConfUtils.*;
@@ -181,6 +182,16 @@ public interface NettyConfigKeys {
static void setWorkerGroupSize(RaftProperties properties, int num) {
setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, num);
}
+
+ String CHANNEL_INACTIVE_GRACE_PERIOD_KEY = PREFIX +
".channel.inactive.grace-period";
+ TimeDuration CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT =
TimeDuration.valueOf(10, TimeUnit.MINUTES);
+ static TimeDuration channelInactiveGracePeriod(RaftProperties
properties) {
+ return
getTimeDuration(properties.getTimeDuration(CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT.getUnit()),
+ CHANNEL_INACTIVE_GRACE_PERIOD_KEY,
CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT, getDefaultLog());
+ }
+ static void setChannelInactiveGracePeriod(RaftProperties properties,
TimeDuration timeoutDuration) {
+ setTimeDuration(properties::setTimeDuration,
CHANNEL_INACTIVE_GRACE_PERIOD_KEY, timeoutDuration);
+ }
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index b3f42bf32..fdd6c502e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -383,6 +383,10 @@ public class DataStreamManagement {
}
}
+ void cleanUpOnChannelInactive(ClientInvocationId key) {
+ Optional.ofNullable(streams.remove(key)).ifPresent(removed ->
removed.getLocal().cleanUp());
+ }
+
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>,
Set<DataStreamOutputRpc>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 70cb47026..6e1c88c72 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -28,6 +28,7 @@ import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -59,6 +60,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -156,12 +158,18 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private final NettyServerStreamRpcMetrics metrics;
+ private final TimeDuration channelInactiveGracePeriod;
+
public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
this.metrics = new NettyServerStreamRpcMetrics(this.name);
this.requests = new DataStreamManagement(server, metrics);
final RaftProperties properties = server.getProperties();
+
+ this.channelInactiveGracePeriod = NettyConfigKeys.DataStream.Server
+ .channelInactiveGracePeriod(properties);
+
this.proxies = new ProxiesPool(name, properties, parameters);
final boolean useEpoll =
NettyConfigKeys.DataStream.Server.useEpoll(properties);
@@ -234,6 +242,19 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
}
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // Delayed memory garbage cleanup
+ Optional.ofNullable(requestRef.getAndSetNull()).ifPresent(request -> {
+ ClientInvocationId clientInvocationId = ClientInvocationId
+ .valueOf(request.getClientId(), request.getStreamId());
+ TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod,
+ () -> requests.cleanUpOnChannelInactive(clientInvocationId),
+ LOG, () -> "Timeout check failed, clientInvocationId=" +
+ clientInvocationId);
+ });
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
throwable) {
Optional.ofNullable(requestRef.getAndSetNull())