This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2e4e75bdc RATIS-1880. Optimize Stream client&server side channel
pipeline Create (#910)
2e4e75bdc is described below
commit 2e4e75bdc9671d9ac03eae50ce02ff4aa48ced1f
Author: hao guo <[email protected]>
AuthorDate: Tue Oct 10 00:22:33 2023 +0800
RATIS-1880. Optimize Stream client&server side channel pipeline Create
(#910)
---
.../ratis/netty/client/NettyClientStreamRpc.java | 53 ++++++++++++----------
.../ratis/netty/server/NettyServerStreamRpc.java | 18 ++++----
2 files changed, 39 insertions(+), 32 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index d039ca558..e4c154fd2 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -40,6 +40,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import
org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
@@ -313,40 +314,44 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
if (sslContext != null) {
p.addLast("ssl", sslContext.newHandler(ch.alloc(),
address.getHostName(), address.getPort()));
}
- p.addLast(newEncoder());
- p.addLast(newEncoderDataStreamRequestFilePositionCount());
- p.addLast(newEncoderByteBuffer());
+ p.addLast(ENCODER);
+ p.addLast(ENCODER_FILE_POSITION_COUNT);
+ p.addLast(ENCODER_BYTE_BUFFER);
p.addLast(newDecoder());
p.addLast(handler);
}
};
}
- static MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
- return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
- @Override
- protected void encode(ChannelHandlerContext context,
DataStreamRequestByteBuffer request, List<Object> out) {
- NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request,
out::add, context.alloc());
- }
- };
+ static final MessageToMessageEncoder<DataStreamRequestByteBuffer> ENCODER =
new Encoder();
+
+ @ChannelHandler.Sharable
+ static class Encoder extends
MessageToMessageEncoder<DataStreamRequestByteBuffer> {
+ @Override
+ protected void encode(ChannelHandlerContext context,
DataStreamRequestByteBuffer request, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request,
out::add, context.alloc());
+ }
}
- static MessageToMessageEncoder<DataStreamRequestFilePositionCount>
newEncoderDataStreamRequestFilePositionCount() {
- return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>() {
- @Override
- protected void encode(ChannelHandlerContext ctx,
DataStreamRequestFilePositionCount request, List<Object> out) {
- NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request,
out::add, ctx.alloc());
- }
- };
+ static final MessageToMessageEncoder<DataStreamRequestFilePositionCount>
ENCODER_FILE_POSITION_COUNT
+ = new EncoderFilePositionCount();
+
+ @ChannelHandler.Sharable
+ static class EncoderFilePositionCount extends
MessageToMessageEncoder<DataStreamRequestFilePositionCount> {
+ @Override
+ protected void encode(ChannelHandlerContext ctx,
DataStreamRequestFilePositionCount request, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request,
out::add, ctx.alloc());
+ }
}
- static MessageToMessageEncoder<ByteBuffer> newEncoderByteBuffer() {
- return new MessageToMessageEncoder<ByteBuffer>() {
- @Override
- protected void encode(ChannelHandlerContext ctx, ByteBuffer request,
List<Object> out) {
- NettyDataStreamUtils.encodeByteBuffer(request, out::add);
- }
- };
+ static final MessageToMessageEncoder<ByteBuffer> ENCODER_BYTE_BUFFER = new
EncoderByteBuffer();
+
+ @ChannelHandler.Sharable
+ static class EncoderByteBuffer extends MessageToMessageEncoder<ByteBuffer> {
+ @Override
+ protected void encode(ChannelHandlerContext ctx, ByteBuffer request,
List<Object> out) {
+ NettyDataStreamUtils.encodeByteBuffer(request, out::add);
+ }
}
static ByteToMessageDecoder newDecoder() {
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 135733773..2c5356053 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -38,6 +38,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import
org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
@@ -258,7 +259,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
p.addLast("ssl", sslContext.newHandler(ch.alloc()));
}
p.addLast(newDecoder());
- p.addLast(newEncoder());
+ p.addLast(ENCODER);
p.addLast(newChannelInboundHandlerAdapter());
}
};
@@ -277,13 +278,14 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
};
}
- static MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
- return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
- @Override
- protected void encode(ChannelHandlerContext context,
DataStreamReplyByteBuffer reply, List<Object> out) {
- NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add,
context.alloc());
- }
- };
+ static final MessageToMessageEncoder<DataStreamReplyByteBuffer> ENCODER =
new Encoder();
+
+ @ChannelHandler.Sharable
+ static class Encoder extends
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
+ @Override
+ protected void encode(ChannelHandlerContext context,
DataStreamReplyByteBuffer reply, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add,
context.alloc());
+ }
}
@Override