This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new da56f4f  RATIS-1114. Change NettyServerStreamRpc to use only one 
ConcurrentMap. (#236)
da56f4f is described below

commit da56f4f0b637d750e34ec0f3625c5e3b21409ee6
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 29 07:57:44 2020 +0800

    RATIS-1114. Change NettyServerStreamRpc to use only one ConcurrentMap. 
(#236)
---
 .../ratis/netty/server/NettyServerStreamRpc.java   | 137 +++++++++++----------
 1 file changed, 73 insertions(+), 64 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 3cf52e2..9797709 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -26,8 +26,8 @@ import 
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -36,7 +36,6 @@ import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
-import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.*;
@@ -47,7 +46,6 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
-import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.PeerProxyMap;
@@ -67,7 +65,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -120,6 +117,30 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     }
   }
 
+  static class StreamInfo {
+    private final CompletableFuture<DataStream> stream;
+    private final List<DataStreamOutput> outs;
+    private final AtomicReference<CompletableFuture<?>> previous
+        = new AtomicReference<>(CompletableFuture.completedFuture(null));
+
+    StreamInfo(CompletableFuture<DataStream> stream, List<DataStreamOutput> 
outs) {
+      this.stream = stream;
+      this.outs = outs;
+    }
+
+    CompletableFuture<DataStream> getStream() {
+      return stream;
+    }
+
+    List<DataStreamOutput> getDataStreamOutputs() {
+      return outs;
+    }
+
+    AtomicReference<CompletableFuture<?>> getPrevious() {
+      return previous;
+    }
+  }
+
   private final RaftServer server;
   private final String name;
   private final EventLoopGroup bossGroup = new NioEventLoopGroup();
@@ -127,8 +148,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private final ChannelFuture channelFuture;
 
   private final StateMachine stateMachine;
-  private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = 
new ConcurrentHashMap<>();
-  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput 
= new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, StreamInfo> streams = new 
ConcurrentHashMap<>();
 
   private final Proxies proxies;
 
@@ -170,12 +190,12 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     proxies.addPeers(newPeers);
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
+  private StreamInfo newStreamInfo(ByteBuf buf) {
     try {
-      final RaftClientRequest request =
-          
ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
-      return stateMachine.data().stream(request);
-    } catch (InvalidProtocolBufferException e) {
+      final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
+          RaftClientRequestProto.parseFrom(buf.nioBuffer()));
+      return new StreamInfo(stateMachine.data().stream(request), 
proxies.getDataStreamOutput());
+    } catch (Throwable e) {
       throw new CompletionException(e);
     }
   }
@@ -211,64 +231,58 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
       DataStreamRequestByteBuf request, long bytesWritten, 
ChannelHandlerContext ctx) {
-    try {
       if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
         sendReplyNotSuccess(request, ctx);
       } else {
         sendReplySuccess(request, bytesWritten, ctx);
       }
-    } catch (ExecutionException | InterruptedException e) {
-      IOUtils.asIOException(e);
-    }
   }
 
-  private ChannelInboundHandler getServerHandler(){
+  private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
-      private final AtomicReference<CompletableFuture<?>> previous
-          = new AtomicReference<>(CompletableFuture.completedFuture(null));
-
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
-        final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf) 
msg;
-        final ByteBuf buf = request.slice();
-        final boolean isHeader = request.getType() == Type.STREAM_HEADER;
-
-        final CompletableFuture<Long> localWrite = isHeader ?
-                streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf)).thenApply(stream -> 0L)
-                : streams.get(request.getStreamId()).thenApply(stream -> 
writeTo(buf, stream));
-
-        final List<CompletableFuture<DataStreamReply>> remoteWrites = new 
ArrayList<>();
-        if (isHeader) {
-          // do not need to forward header request
-          final List<DataStreamOutput> outs = proxies.getDataStreamOutput();
-          peersStreamOutput.put(request.getStreamId(), outs);
-          for (DataStreamOutput out : outs) {
-            remoteWrites.add(out.getHeaderFuture());
-          }
-        } else {
-          // body
-          for (DataStreamOutput out : 
peersStreamOutput.get(request.getStreamId())) {
-            remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
-          }
-        }
-
-        final CompletableFuture<?> current = previous.get()
-            .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
-            .thenCombineAsync(localWrite, (v, bytesWritten) -> {
-              buf.release();
-              sendReply(remoteWrites, request, bytesWritten, ctx);
-              return null;
-        });
-        previous.set(current);
+      public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        read(ctx, (DataStreamRequestByteBuf)msg);
       }
     };
   }
 
+  private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf 
request) {
+    final ByteBuf buf = request.slice();
+    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
+
+    final StreamInfo info;
+    final CompletableFuture<Long> localWrite;
+    final List<CompletableFuture<DataStreamReply>> remoteWrites = new 
ArrayList<>();
+    if (isHeader) {
+      info = streams.computeIfAbsent(request.getStreamId(), id -> 
newStreamInfo(buf));
+      localWrite = CompletableFuture.completedFuture(0L);
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.getHeaderFuture());
+      }
+    } else {
+      info = streams.get(request.getStreamId());
+      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
+      }
+    }
+
+    final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
+    final CompletableFuture<?> current = previous.get()
+        .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
+        .thenCombineAsync(localWrite, (v, bytesWritten) -> {
+          buf.release();
+          sendReply(remoteWrites, request, bytesWritten, ctx);
+          return null;
+        });
+    previous.set(current);
+  }
+
   private boolean checkSuccessRemoteWrite(
-          List<CompletableFuture<DataStreamReply>> replyFutures, long 
bytesWritten)
-          throws ExecutionException, InterruptedException {
+          List<CompletableFuture<DataStreamReply>> replyFutures, long 
bytesWritten) {
     for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
-      DataStreamReply reply = replyFuture.get();
+      final DataStreamReply reply = replyFuture.join();
       if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
         return false;
       }
@@ -283,7 +297,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         ChannelPipeline p = ch.pipeline();
         p.addLast(newDecoder());
         p.addLast(newEncoder());
-        p.addLast(getServerHandler());
+        p.addLast(newChannelInboundHandlerAdapter());
       }
     };
   }
@@ -310,10 +324,6 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     };
   }
 
-  private Channel getChannel() {
-    return channelFuture.awaitUninterruptibly().channel();
-  }
-
   @Override
   public void start() {
     channelFuture.syncUninterruptibly();
@@ -321,15 +331,14 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   @Override
   public void close() {
-    final ChannelFuture f = getChannel().close();
-    f.syncUninterruptibly();
-    bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
-    workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
     try {
+      channelFuture.channel().close().sync();
+      bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
+      workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
       bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
       workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
-      LOG.error("Interrupt EventLoopGroup terminate", e);
+      LOG.error(this + ": Interrupted close()", e);
     }
 
     proxies.close();

Reply via email to