This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 83f4ad8bfa encode supports multi message (#10794)
83f4ad8bfa is described below
commit 83f4ad8bfa2289d3873d49eac488b38daf876644
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Tue Oct 25 10:58:54 2022 +0800
encode supports multi message (#10794)
---
.../support/header/HeaderExchangeHandler.java | 9 +++++++
.../transport/netty4/Netty4BatchWriteQueue.java | 31 ++++++++++++++++++++--
.../transport/netty4/NettyBackedChannelBuffer.java | 1 +
.../transport/netty4/NettyClientHandlerTest.java | 3 +--
.../dubbo/rpc/protocol/dubbo/DubboCountCodec.java | 9 ++++++-
5 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
index 94dbd4c4fe..f6c348b2da 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
@@ -31,6 +31,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
+import org.apache.dubbo.remoting.exchange.support.MultiMessage;
import org.apache.dubbo.remoting.transport.ChannelHandlerDelegate;
import java.net.InetSocketAddress;
@@ -150,6 +151,14 @@ public class HeaderExchangeHandler implements
ChannelHandlerDelegate {
Request request = (Request) message;
DefaultFuture.sent(channel, request);
}
+ if (message instanceof MultiMessage) {
+ MultiMessage multiMessage = (MultiMessage) message;
+ for (Object single : multiMessage) {
+ if (single instanceof Request) {
+ DefaultFuture.sent(channel, ((Request) single));
+ }
+ }
+ }
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
index da61073779..b07eae7d3f 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
@@ -18,9 +18,14 @@ package org.apache.dubbo.remoting.transport.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import org.apache.dubbo.common.BatchExecutorQueue;
+import org.apache.dubbo.remoting.exchange.support.MultiMessage;
+
+import java.util.LinkedList;
+import java.util.Queue;
/**
* netty4 batch write queue
@@ -31,6 +36,10 @@ public class Netty4BatchWriteQueue extends
BatchExecutorQueue<Netty4BatchWriteQu
private final EventLoop eventLoop;
+ private final Queue<ChannelPromise> promises = new LinkedList<>();
+
+ private final MultiMessage multiMessage = MultiMessage.create();
+
private Netty4BatchWriteQueue(Channel channel) {
this.channel = channel;
this.eventLoop = channel.eventLoop();
@@ -48,13 +57,31 @@ public class Netty4BatchWriteQueue extends
BatchExecutorQueue<Netty4BatchWriteQu
@Override
protected void prepare(MessageTuple item) {
- channel.write(item.originMessage, item.channelPromise);
+ multiMessage.addMessage(item.originMessage);
+ promises.add(item.channelPromise);
}
@Override
protected void flush(MessageTuple item) {
prepare(item);
- channel.flush();
+ Object finalMessage = multiMessage;
+ if (multiMessage.size() == 1) {
+ finalMessage = multiMessage.get(0);
+ }
+ channel.writeAndFlush(finalMessage).addListener(new
ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws
Exception {
+ ChannelPromise cp;
+ while ((cp = promises.poll()) != null) {
+ if (future.isSuccess()){
+ cp.setSuccess();
+ } else {
+ cp.setFailure(future.cause());
+ }
+ }
+ }
+ });
+ this.multiMessage.removeMessages();
}
public static Netty4BatchWriteQueue createWriteQueue(Channel channel) {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
index a64a646721..09586ed625 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
@@ -439,6 +439,7 @@ public class NettyBackedChannelBuffer implements
ChannelBuffer {
@Override
public void writerIndex(int writerIndex) {
+ buffer.ensureWritable(writerIndex);
buffer.writerIndex(writerIndex);
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
index 1b6fed3092..ddf2853082 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
@@ -75,8 +75,7 @@ public class NettyClientHandlerTest {
nettyClientHandler.userEventTriggered(ctx,
IdleStateEvent.READER_IDLE_STATE_EVENT);
ArgumentCaptor<Request> requestArgumentCaptor =
ArgumentCaptor.forClass(Request.class);
Thread.sleep(500);
- Mockito.verify(channel,
Mockito.times(1)).write(requestArgumentCaptor.capture(), any());
- Mockito.verify(channel, Mockito.times(1)).flush();
+ Mockito.verify(channel,
Mockito.times(1)).writeAndFlush(requestArgumentCaptor.capture());
Request request = new Request();
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
index f2fddeb799..bc57ef829c 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
@@ -44,7 +44,14 @@ public final class DubboCountCodec implements Codec2 {
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg)
throws IOException {
- codec.encode(channel, buffer, msg);
+ if (msg instanceof MultiMessage) {
+ MultiMessage multiMessage = (MultiMessage) msg;
+ for (Object singleMessage : multiMessage) {
+ codec.encode(channel, buffer, singleMessage);
+ }
+ } else {
+ codec.encode(channel, buffer, msg);
+ }
}
@Override