This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 83f4ad8bfa encode supports multi message (#10794)
83f4ad8bfa is described below

commit 83f4ad8bfa2289d3873d49eac488b38daf876644
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Tue Oct 25 10:58:54 2022 +0800

    encode supports multi message (#10794)
---
 .../support/header/HeaderExchangeHandler.java      |  9 +++++++
 .../transport/netty4/Netty4BatchWriteQueue.java    | 31 ++++++++++++++++++++--
 .../transport/netty4/NettyBackedChannelBuffer.java |  1 +
 .../transport/netty4/NettyClientHandlerTest.java   |  3 +--
 .../dubbo/rpc/protocol/dubbo/DubboCountCodec.java  |  9 ++++++-
 5 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
index 94dbd4c4fe..f6c348b2da 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java
@@ -31,6 +31,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
+import org.apache.dubbo.remoting.exchange.support.MultiMessage;
 import org.apache.dubbo.remoting.transport.ChannelHandlerDelegate;
 
 import java.net.InetSocketAddress;
@@ -150,6 +151,14 @@ public class HeaderExchangeHandler implements 
ChannelHandlerDelegate {
             Request request = (Request) message;
             DefaultFuture.sent(channel, request);
         }
+        if (message instanceof MultiMessage) {
+            MultiMessage multiMessage = (MultiMessage) message;
+            for (Object single : multiMessage) {
+                if (single instanceof Request) {
+                    DefaultFuture.sent(channel, ((Request) single));
+                }
+            }
+        }
         if (exception != null) {
             if (exception instanceof RuntimeException) {
                 throw (RuntimeException) exception;
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
index da61073779..b07eae7d3f 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/Netty4BatchWriteQueue.java
@@ -18,9 +18,14 @@ package org.apache.dubbo.remoting.transport.netty4;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoop;
 import org.apache.dubbo.common.BatchExecutorQueue;
+import org.apache.dubbo.remoting.exchange.support.MultiMessage;
+
+import java.util.LinkedList;
+import java.util.Queue;
 
 /**
  * netty4 batch write queue
@@ -31,6 +36,10 @@ public class Netty4BatchWriteQueue extends 
BatchExecutorQueue<Netty4BatchWriteQu
 
     private final EventLoop eventLoop;
 
+    private final Queue<ChannelPromise> promises = new LinkedList<>();
+
+    private final MultiMessage multiMessage = MultiMessage.create();
+
     private Netty4BatchWriteQueue(Channel channel) {
         this.channel = channel;
         this.eventLoop = channel.eventLoop();
@@ -48,13 +57,31 @@ public class Netty4BatchWriteQueue extends 
BatchExecutorQueue<Netty4BatchWriteQu
 
     @Override
     protected void prepare(MessageTuple item) {
-        channel.write(item.originMessage, item.channelPromise);
+        multiMessage.addMessage(item.originMessage);
+        promises.add(item.channelPromise);
     }
 
     @Override
     protected void flush(MessageTuple item) {
         prepare(item);
-        channel.flush();
+        Object finalMessage = multiMessage;
+        if (multiMessage.size() == 1) {
+            finalMessage = multiMessage.get(0);
+        }
+        channel.writeAndFlush(finalMessage).addListener(new 
ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+                ChannelPromise cp;
+                while ((cp = promises.poll()) != null) {
+                    if (future.isSuccess()){
+                        cp.setSuccess();
+                    } else {
+                        cp.setFailure(future.cause());
+                    }
+                }
+            }
+        });
+        this.multiMessage.removeMessages();
     }
 
     public static Netty4BatchWriteQueue createWriteQueue(Channel channel) {
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
index a64a646721..09586ed625 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java
@@ -439,6 +439,7 @@ public class NettyBackedChannelBuffer implements 
ChannelBuffer {
 
     @Override
     public void writerIndex(int writerIndex) {
+        buffer.ensureWritable(writerIndex);
         buffer.writerIndex(writerIndex);
     }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
index 1b6fed3092..ddf2853082 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
@@ -75,8 +75,7 @@ public class NettyClientHandlerTest {
         nettyClientHandler.userEventTriggered(ctx, 
IdleStateEvent.READER_IDLE_STATE_EVENT);
         ArgumentCaptor<Request> requestArgumentCaptor = 
ArgumentCaptor.forClass(Request.class);
         Thread.sleep(500);
-        Mockito.verify(channel, 
Mockito.times(1)).write(requestArgumentCaptor.capture(), any());
-        Mockito.verify(channel, Mockito.times(1)).flush();
+        Mockito.verify(channel, 
Mockito.times(1)).writeAndFlush(requestArgumentCaptor.capture());
 
 
         Request request = new Request();
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
index f2fddeb799..bc57ef829c 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodec.java
@@ -44,7 +44,14 @@ public final class DubboCountCodec implements Codec2 {
 
     @Override
     public void encode(Channel channel, ChannelBuffer buffer, Object msg) 
throws IOException {
-        codec.encode(channel, buffer, msg);
+        if (msg instanceof MultiMessage) {
+            MultiMessage multiMessage = (MultiMessage) msg;
+            for (Object singleMessage : multiMessage) {
+                codec.encode(channel, buffer, singleMessage);
+            }
+        } else {
+            codec.encode(channel, buffer, msg);
+        }
     }
 
     @Override

Reply via email to