This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 58e89cb7e RATIS-1853. When the stream server channelInactive, all 
requests in the channel. (#889)
58e89cb7e is described below

commit 58e89cb7e1d50fb00f5798bba419f07d9aee4f0b
Author: hao guo <[email protected]>
AuthorDate: Sat Jul 1 13:16:26 2023 +0800

    RATIS-1853. When the stream server channelInactive, all requests in the 
channel. (#889)
---
 .../ratis/netty/server/DataStreamManagement.java   | 49 +++++++++++++++++++++-
 .../ratis/netty/server/NettyServerStreamRpc.java   | 12 +-----
 2 files changed, 48 insertions(+), 13 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index fdd6c502e..a4cc537dd 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -50,11 +50,14 @@ import 
org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
 import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.apache.ratis.util.function.CheckedBiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +67,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -224,10 +228,31 @@ public class DataStreamManagement {
     }
   }
 
+  public static class ChannelMap {
+    private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> 
map = new ConcurrentHashMap<>();
+
+    public void add(ChannelId channelId,
+                    ClientInvocationId clientInvocationId) {
+      map.computeIfAbsent(channelId, (e) -> new 
ConcurrentHashMap<>()).put(clientInvocationId, clientInvocationId);
+    }
+
+    public void remove(ChannelId channelId,
+                       ClientInvocationId clientInvocationId) {
+      Optional.ofNullable(map.get(channelId)).ifPresent((ids) -> 
ids.remove(clientInvocationId));
+    }
+
+    public Set<ClientInvocationId> remove(ChannelId channelId) {
+      return Optional.ofNullable(map.remove(channelId))
+          .map(Map::keySet)
+          .orElse(Collections.emptySet());
+    }
+  }
+
   private final RaftServer server;
   private final String name;
 
   private final StreamMap streams = new StreamMap();
+  private final ChannelMap channels;
   private final Executor requestExecutor;
   private final Executor writeExecutor;
 
@@ -237,6 +262,7 @@ public class DataStreamManagement {
     this.server = server;
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
 
+    this.channels = new ChannelMap();
     final RaftProperties properties = server.getProperties();
     final boolean useCachedThreadPool = 
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
     this.requestExecutor = 
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
@@ -383,8 +409,21 @@ public class DataStreamManagement {
     }
   }
 
-  void cleanUpOnChannelInactive(ClientInvocationId key) {
-    Optional.ofNullable(streams.remove(key)).ifPresent(removed -> 
removed.getLocal().cleanUp());
+  void cleanUp(Set<ClientInvocationId> ids) {
+    for (ClientInvocationId clientInvocationId : ids) {
+      Optional.ofNullable(streams.remove(clientInvocationId))
+          .map(StreamInfo::getLocal)
+          .ifPresent(LocalStream::cleanUp);
+    }
+  }
+
+  void cleanUpOnChannelInactive(ChannelId channelId, TimeDuration 
channelInactiveGracePeriod) {
+    // Delayed memory garbage cleanup
+    Optional.ofNullable(channels.remove(channelId)).ifPresent(ids -> {
+      LOG.info("Channel {} is inactive, cleanup clientInvocationIds={}", 
channelId, ids);
+      TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod, () 
-> cleanUp(ids),
+          LOG, () -> "Timeout check failed, clientInvocationIds=" + ids);
+    });
   }
 
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
@@ -401,6 +440,11 @@ public class DataStreamManagement {
       CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
     final boolean close = 
request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
     ClientInvocationId key =  
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
+
+    // add to ChannelMap
+    final ChannelId channelId = ctx.channel().id();
+    channels.add(channelId, key);
+
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
       final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
@@ -452,6 +496,7 @@ public class DataStreamManagement {
         }
       } finally {
         request.release();
+        channels.remove(channelId, key);
       }
     });
   }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 6e1c88c72..8cb34d897 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -28,7 +28,6 @@ import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.netty.NettyUtils;
 import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamPacket;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -60,7 +59,6 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutExecutor;
 import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -244,15 +242,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
       @Override
       public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        // Delayed memory garbage cleanup
-        Optional.ofNullable(requestRef.getAndSetNull()).ifPresent(request -> {
-          ClientInvocationId clientInvocationId = ClientInvocationId
-              .valueOf(request.getClientId(), request.getStreamId());
-          TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod,
-              () -> requests.cleanUpOnChannelInactive(clientInvocationId),
-              LOG, () -> "Timeout check failed, clientInvocationId=" +
-                  clientInvocationId);
-        });
+        requests.cleanUpOnChannelInactive(ctx.channel().id(), 
channelInactiveGracePeriod);
       }
 
       @Override

Reply via email to