This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8adc4ffc7 RATIS-1850. When the stream server side receives a 
disconnection, memory is cleared (#887)
8adc4ffc7 is described below

commit 8adc4ffc7bd0b97c75286e3ed991dcbf3cd7907a
Author: hao guo <[email protected]>
AuthorDate: Wed Jun 21 15:49:24 2023 +0800

    RATIS-1850. When the stream server side receives a disconnection, memory is 
cleared (#887)
---
 .../org/apache/ratis/netty/NettyConfigKeys.java     | 11 +++++++++++
 .../ratis/netty/server/DataStreamManagement.java    |  4 ++++
 .../ratis/netty/server/NettyServerStreamRpc.java    | 21 +++++++++++++++++++++
 3 files changed, 36 insertions(+)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 07e662162..81a381c9c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -26,6 +26,7 @@ import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.conf.ConfUtils.*;
@@ -181,6 +182,16 @@ public interface NettyConfigKeys {
       static void setWorkerGroupSize(RaftProperties properties, int num) {
         setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, num);
       }
+
+      String CHANNEL_INACTIVE_GRACE_PERIOD_KEY = PREFIX + 
".channel.inactive.grace-period";
+      TimeDuration CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT = 
TimeDuration.valueOf(10, TimeUnit.MINUTES);
+      static TimeDuration channelInactiveGracePeriod(RaftProperties 
properties) {
+        return 
getTimeDuration(properties.getTimeDuration(CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT.getUnit()),
+            CHANNEL_INACTIVE_GRACE_PERIOD_KEY, 
CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT, getDefaultLog());
+      }
+      static void setChannelInactiveGracePeriod(RaftProperties properties, 
TimeDuration timeoutDuration) {
+        setTimeDuration(properties::setTimeDuration, 
CHANNEL_INACTIVE_GRACE_PERIOD_KEY, timeoutDuration);
+      }
     }
   }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index b3f42bf32..fdd6c502e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -383,6 +383,10 @@ public class DataStreamManagement {
     }
   }
 
+  void cleanUpOnChannelInactive(ClientInvocationId key) {
+    Optional.ofNullable(streams.remove(key)).ifPresent(removed -> 
removed.getLocal().cleanUp());
+  }
+
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
       CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
     LOG.debug("{}: read {}", this, request);
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 70cb47026..6e1c88c72 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -28,6 +28,7 @@ import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.netty.NettyUtils;
 import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamPacket;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -59,6 +60,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -156,12 +158,18 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   private final NettyServerStreamRpcMetrics metrics;
 
+  private final TimeDuration channelInactiveGracePeriod;
+
   public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
     this.metrics = new NettyServerStreamRpcMetrics(this.name);
     this.requests = new DataStreamManagement(server, metrics);
 
     final RaftProperties properties = server.getProperties();
+
+    this.channelInactiveGracePeriod = NettyConfigKeys.DataStream.Server
+        .channelInactiveGracePeriod(properties);
+
     this.proxies = new ProxiesPool(name, properties, parameters);
 
     final boolean useEpoll = 
NettyConfigKeys.DataStream.Server.useEpoll(properties);
@@ -234,6 +242,19 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         }
       }
 
+      @Override
+      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // Delayed memory garbage cleanup
+        Optional.ofNullable(requestRef.getAndSetNull()).ifPresent(request -> {
+          ClientInvocationId clientInvocationId = ClientInvocationId
+              .valueOf(request.getClientId(), request.getStreamId());
+          TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod,
+              () -> requests.cleanUpOnChannelInactive(clientInvocationId),
+              LOG, () -> "Timeout check failed, clientInvocationId=" +
+                  clientInvocationId);
+        });
+      }
+
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
throwable) {
         Optional.ofNullable(requestRef.getAndSetNull())

Reply via email to