This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e4e75bdc RATIS-1880. Optimize Stream client&server side channel 
pipeline Create (#910)
2e4e75bdc is described below

commit 2e4e75bdc9671d9ac03eae50ce02ff4aa48ced1f
Author: hao guo <[email protected]>
AuthorDate: Tue Oct 10 00:22:33 2023 +0800

    RATIS-1880. Optimize Stream client&server side channel pipeline Create 
(#910)
---
 .../ratis/netty/client/NettyClientStreamRpc.java   | 53 ++++++++++++----------
 .../ratis/netty/server/NettyServerStreamRpc.java   | 18 ++++----
 2 files changed, 39 insertions(+), 32 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index d039ca558..e4c154fd2 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -40,6 +40,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.Channel;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
 import 
org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
@@ -313,40 +314,44 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         if (sslContext != null) {
           p.addLast("ssl", sslContext.newHandler(ch.alloc(), 
address.getHostName(), address.getPort()));
         }
-        p.addLast(newEncoder());
-        p.addLast(newEncoderDataStreamRequestFilePositionCount());
-        p.addLast(newEncoderByteBuffer());
+        p.addLast(ENCODER);
+        p.addLast(ENCODER_FILE_POSITION_COUNT);
+        p.addLast(ENCODER_BYTE_BUFFER);
         p.addLast(newDecoder());
         p.addLast(handler);
       }
     };
   }
 
-  static MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
-    return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
-      @Override
-      protected void encode(ChannelHandlerContext context, 
DataStreamRequestByteBuffer request, List<Object> out) {
-        NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request, 
out::add, context.alloc());
-      }
-    };
+  static final MessageToMessageEncoder<DataStreamRequestByteBuffer> ENCODER = 
new Encoder();
+
+  @ChannelHandler.Sharable
+  static class Encoder extends 
MessageToMessageEncoder<DataStreamRequestByteBuffer> {
+    @Override
+    protected void encode(ChannelHandlerContext context, 
DataStreamRequestByteBuffer request, List<Object> out) {
+      NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request, 
out::add, context.alloc());
+    }
   }
 
-  static MessageToMessageEncoder<DataStreamRequestFilePositionCount> 
newEncoderDataStreamRequestFilePositionCount() {
-    return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>() {
-      @Override
-      protected void encode(ChannelHandlerContext ctx, 
DataStreamRequestFilePositionCount request, List<Object> out) {
-        NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request, 
out::add, ctx.alloc());
-      }
-    };
+  static final MessageToMessageEncoder<DataStreamRequestFilePositionCount> 
ENCODER_FILE_POSITION_COUNT
+      = new EncoderFilePositionCount();
+
+  @ChannelHandler.Sharable
+  static class EncoderFilePositionCount extends 
MessageToMessageEncoder<DataStreamRequestFilePositionCount> {
+    @Override
+    protected void encode(ChannelHandlerContext ctx, 
DataStreamRequestFilePositionCount request, List<Object> out) {
+      NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request, 
out::add, ctx.alloc());
+    }
   }
 
-  static MessageToMessageEncoder<ByteBuffer> newEncoderByteBuffer() {
-    return new MessageToMessageEncoder<ByteBuffer>() {
-      @Override
-      protected void encode(ChannelHandlerContext ctx, ByteBuffer request, 
List<Object> out) {
-        NettyDataStreamUtils.encodeByteBuffer(request, out::add);
-      }
-    };
+  static final MessageToMessageEncoder<ByteBuffer> ENCODER_BYTE_BUFFER = new 
EncoderByteBuffer();
+
+  @ChannelHandler.Sharable
+  static class EncoderByteBuffer extends MessageToMessageEncoder<ByteBuffer> {
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuffer request, 
List<Object> out) {
+      NettyDataStreamUtils.encodeByteBuffer(request, out::add);
+    }
   }
 
   static ByteToMessageDecoder newDecoder() {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 135733773..2c5356053 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -38,6 +38,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
 import 
org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
@@ -258,7 +259,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
           p.addLast("ssl", sslContext.newHandler(ch.alloc()));
         }
         p.addLast(newDecoder());
-        p.addLast(newEncoder());
+        p.addLast(ENCODER);
         p.addLast(newChannelInboundHandlerAdapter());
       }
     };
@@ -277,13 +278,14 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     };
   }
 
-  static MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
-    return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
-      @Override
-      protected void encode(ChannelHandlerContext context, 
DataStreamReplyByteBuffer reply, List<Object> out) {
-        NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add, 
context.alloc());
-      }
-    };
+  static final MessageToMessageEncoder<DataStreamReplyByteBuffer> ENCODER = 
new Encoder();
+
+  @ChannelHandler.Sharable
+  static class Encoder extends 
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
+    @Override
+    protected void encode(ChannelHandlerContext context, 
DataStreamReplyByteBuffer reply, List<Object> out) {
+      NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add, 
context.alloc());
+    }
   }
 
   @Override

Reply via email to