This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new a293e7e7c8 Implement service-level payload (#11517)
a293e7e7c8 is described below

commit a293e7e7c86fc7b179159634cde9c32c752174fb
Author: Mengyang Tang <[email protected]>
AuthorDate: Thu Mar 2 20:14:01 2023 +0800

    Implement service-level payload (#11517)
---
 .../dubbo/common/constants/CommonConstants.java    |  2 ++
 .../apache/dubbo/config/AbstractServiceConfig.java | 12 ++++++++++
 .../dubbo/config/annotation/DubboService.java      |  5 ++++
 .../src/main/resources/META-INF/dubbo.xsd          | 10 ++++----
 .../apache/dubbo/metadata/MetadataInfoTest.java    | 17 +++++++++++++
 .../apache/dubbo/remoting/exchange/Request.java    | 16 +++++++++++--
 .../remoting/exchange/codec/ExchangeCodec.java     |  2 +-
 .../support/header/HeaderExchangeChannel.java      | 17 ++++++++-----
 .../dubbo/remoting/transport/AbstractCodec.java    | 13 ++++++++++
 .../dubbo/remoting/exchange/RequestTest.java       |  2 ++
 .../remoting/transport/AbstractCodecTest.java      | 28 ++++++++++++++++++++++
 .../rpc/protocol/dubbo/ChannelWrappedInvoker.java  | 17 +++++++++++--
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     | 19 +++++++++++++--
 13 files changed, 142 insertions(+), 18 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index a356b4cae6..62ff4beeeb 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -619,4 +619,6 @@ public interface CommonConstants {
 
     String ENCODE_IN_IO_THREAD_KEY = "encode.in.io";
     boolean DEFAULT_ENCODE_IN_IO_THREAD = false;
+
+    String PAYLOAD = "payload";
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java 
b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
index 474c362cfe..6b5545c62f 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
@@ -150,6 +150,11 @@ public abstract class AbstractServiceConfig extends 
AbstractInterfaceConfig {
      */
     private Executor executor;
 
+    /**
+     * Payload max length.
+     */
+    private Integer payload;
+
     public AbstractServiceConfig() {
     }
 
@@ -377,4 +382,11 @@ public abstract class AbstractServiceConfig extends 
AbstractInterfaceConfig {
         return executor;
     }
 
+    public Integer getPayload() {
+        return payload;
+    }
+
+    public void setPayload(Integer payload) {
+        this.payload = payload;
+    }
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
index c13c6c97e9..d2d3cb69f0 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
@@ -323,4 +323,9 @@ public @interface DubboService {
      * @return
      */
     String executor() default "";
+
+    /**
+     * Payload max length.
+     */
+    String payload() default "";
 }
diff --git 
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd 
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 457ebfe6d3..9241700cd4 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -371,6 +371,11 @@
                             <![CDATA[ Bean name of service executor(thread 
pool), used for thread pool isolation between services. ]]></xsd:documentation>
                     </xsd:annotation>
                 </xsd:attribute>
+                <xsd:attribute name="payload" type="xsd:string">
+                    <xsd:annotation>
+                        <xsd:documentation><![CDATA[ The max payload. 
]]></xsd:documentation>
+                    </xsd:annotation>
+                </xsd:attribute>
                 <xsd:anyAttribute namespace="##other" processContents="lax"/>
             </xsd:extension>
         </xsd:complexContent>
@@ -1594,11 +1599,6 @@
                         <xsd:documentation><![CDATA[ The protocol charset. 
]]></xsd:documentation>
                     </xsd:annotation>
                 </xsd:attribute>
-                <xsd:attribute name="payload" type="xsd:string">
-                    <xsd:annotation>
-                        <xsd:documentation><![CDATA[ The max payload. 
]]></xsd:documentation>
-                    </xsd:annotation>
-                </xsd:attribute>
                 <xsd:attribute name="buffer" type="xsd:string">
                     <xsd:annotation>
                         <xsd:documentation><![CDATA[ The buffer size. 
]]></xsd:documentation>
diff --git 
a/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
 
b/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
index 544aaa5721..3d0177ef78 100644
--- 
a/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
+++ 
b/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
@@ -36,6 +36,7 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PAYLOAD;
 import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -66,6 +67,12 @@ class MetadataInfoTest {
         
"&metadata-type=remote&methods=sayHello&sayHello.timeout=7000&pid=36621&release=&revision=1.0.0&service-name-mapping=true"
 +
         
"&side=provider&timeout=5000&timestamp=1629970068002&version=1.0.0&params-filter=-customized,excluded");
 
+    private static URL url4 = 
URL.valueOf("dubbo://30.225.21.30:20880/org.apache.dubbo.registry.service.DemoService?"
 +
+        
"REGISTRY_CLUSTER=registry1&anyhost=true&application=demo-provider2&delay=5000&deprecated=false&dubbo=2.0.2"
 +
+        
"&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.registry.service.DemoService"
 +
+        
"&metadata-type=remote&methods=sayHello&sayHello.timeout=7000&pid=36621&release=&revision=1.0.0&service-name-mapping=true"
 +
+        
"&side=provider&timeout=5000&timestamp=1629970068002&version=1.0.0&params-filter=-customized,excluded&payload=1024");
+
     @Test
     void testEmptyRevision() {
         MetadataInfo metadataInfo = new MetadataInfo("demo");
@@ -215,4 +222,14 @@ class MetadataInfoTest {
         assertNull(ret.get("content"));
         assertNull(ret.get("rawMetadataInfo"));
     }
+
+    @Test
+    void testPayload() {
+        MetadataInfo metadataInfo = new MetadataInfo("demo");
+
+        metadataInfo.addService(url4);
+        MetadataInfo.ServiceInfo serviceInfo4 = 
metadataInfo.getServiceInfo(url4.getProtocolServiceKey());
+        assertNotNull(serviceInfo4);
+        assertEquals("1024", serviceInfo4.getParameter(PAYLOAD));
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
index d75a21a323..4e47aed49d 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Request.java
@@ -39,6 +39,8 @@ public class Request {
 
     private boolean mBroken = false;
 
+    private int mPayload;
+
     private Object mData;
 
     public Request() {
@@ -61,7 +63,7 @@ public class Request {
 
         try {
             return data.toString();
-        } catch (Throwable e) {
+        } catch (Exception e) {
             return "<Fail toString of " + data.getClass() + ", cause: " + 
StringUtils.toString(e) + ">";
         }
     }
@@ -107,6 +109,14 @@ public class Request {
         this.mBroken = mBroken;
     }
 
+    public int getPayload() {
+        return mPayload;
+    }
+
+    public void setPayload(int mPayload) {
+        this.mPayload = mPayload;
+    }
+
     public Object getData() {
         return mData;
     }
@@ -131,6 +141,7 @@ public class Request {
         copy.mTwoWay = this.mTwoWay;
         copy.mEvent = this.mEvent;
         copy.mBroken = this.mBroken;
+        copy.mPayload = this.mPayload;
         copy.mData = this.mData;
         return copy;
     }
@@ -141,12 +152,13 @@ public class Request {
         copy.mTwoWay = this.mTwoWay;
         copy.mEvent = this.mEvent;
         copy.mBroken = this.mBroken;
+        copy.mPayload = this.mPayload;
         return copy;
     }
 
     @Override
     public String toString() {
         return "Request [id=" + mId + ", version=" + mVersion + ", twoWay=" + 
mTwoWay + ", event=" + mEvent
-            + ", broken=" + mBroken + ", data=" + (mData == this ? "this" : 
safeToString(mData)) + "]";
+            + ", broken=" + mBroken + ", mPayload=" + mPayload + ", data=" + 
(mData == this ? "this" : safeToString(mData)) + "]";
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 9937c573fb..7a92b0202a 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -273,7 +273,7 @@ public class ExchangeCodec extends TelnetCodec {
         bos.flush();
         bos.close();
         int len = bos.writtenBytes();
-        checkPayload(channel, len);
+        checkPayload(channel, req.getPayload(), len);
         Bytes.int2bytes(len, header, 12);
 
         // write
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
index 787130ed56..ab576db76a 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
@@ -126,11 +126,16 @@ final class HeaderExchangeChannel implements 
ExchangeChannel {
         if (closed) {
             throw new RemotingException(this.getLocalAddress(), null, "Failed 
to send request " + request + ", cause: The channel " + this + " is closed!");
         }
-        // create request.
-        Request req = new Request();
-        req.setVersion(Version.getProtocolVersion());
-        req.setTwoWay(true);
-        req.setData(request);
+        Request req;
+        if (request instanceof Request) {
+            req = (Request) request;
+        } else {
+            // create request.
+            req = new Request();
+            req.setVersion(Version.getProtocolVersion());
+            req.setTwoWay(true);
+            req.setData(request);
+        }
         DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, 
executor);
         try {
             channel.send(req);
@@ -156,7 +161,7 @@ final class HeaderExchangeChannel implements 
ExchangeChannel {
             // graceful close
             DefaultFuture.closeChannel(channel);
             channel.close();
-        } catch (Throwable e) {
+        } catch (Exception e) {
             logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
         }
     }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
index aabf6ee3b0..144e352bb2 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
@@ -63,6 +63,19 @@ public abstract class AbstractCodec implements Codec2, 
ScopeModelAware {
         }
     }
 
+    protected static void checkPayload(Channel channel, int payload, long 
size) throws IOException {
+        if (payload <= 0) {
+            payload = getPayload(channel);
+        }
+        boolean overPayload = isOverPayload(payload, size);
+        if (overPayload) {
+            ExceedPayloadLimitException e = new ExceedPayloadLimitException(
+                "Data length too large: " + size + ", max payload: " + payload 
+ ", channel: " + channel);
+            logger.error(TRANSPORT_EXCEED_PAYLOAD_LIMIT, "", "", 
e.getMessage(), e);
+            throw e;
+        }
+    }
+
     protected static int getPayload(Channel channel) {
         if (channel != null && channel.getUrl() != null) {
             return channel.getUrl().getParameter(Constants.PAYLOAD_KEY, 
Constants.DEFAULT_PAYLOAD);
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/RequestTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/RequestTest.java
index f0a8988ee7..138ffd8955 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/RequestTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/RequestTest.java
@@ -29,6 +29,7 @@ class RequestTest {
         request.setVersion("1.0.0");
         request.setEvent(true);
         request.setData("data");
+        request.setPayload(1024);
 
         Assertions.assertTrue(request.isTwoWay());
         Assertions.assertTrue(request.isBroken());
@@ -36,6 +37,7 @@ class RequestTest {
         Assertions.assertEquals(request.getVersion(), "1.0.0");
         Assertions.assertEquals(request.getData(), "data");
         Assertions.assertTrue(request.getId() >= 0);
+        Assertions.assertEquals(1024, request.getPayload());
 
         request.setHeartbeat(true);
         Assertions.assertTrue(request.isHeartbeat());
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/AbstractCodecTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/AbstractCodecTest.java
index 7dd38a60d2..973af20a58 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/AbstractCodecTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/AbstractCodecTest.java
@@ -55,6 +55,34 @@ class AbstractCodecTest {
         verify(channel, VerificationModeFactory.atLeastOnce()).getUrl();
     }
 
+    @Test
+    void testCheckProviderPayload() throws Exception {
+        Channel channel = mock(Channel.class);
+        given(channel.getUrl()).willReturn(URL.valueOf("dubbo://1.1.1.1"));
+
+        AbstractCodec.checkPayload(channel, 1024 * 1024 + 1, 1024 * 1024);
+
+        try {
+            AbstractCodec.checkPayload(channel, 1024 * 1024, 1024 * 1024);
+        } catch (IOException expected) {
+            assertThat(expected.getMessage(), allOf(
+                containsString("Data length too large: "),
+                containsString("max payload: " + 1024 * 1024)
+            ));
+        }
+
+        try {
+            AbstractCodec.checkPayload(channel, 0, 15 * 1024 * 1024);
+        } catch (IOException expected) {
+            assertThat(expected.getMessage(), allOf(
+                containsString("Data length too large: "),
+                containsString("max payload: " + 8 * 1024 * 1024)
+            ));
+        }
+
+        verify(channel, VerificationModeFactory.atLeastOnce()).getUrl();
+    }
+
     @Test
     void tesCheckPayloadMinusPayloadNoLimit() throws Exception {
         Channel channel = mock(Channel.class);
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
index 120c3f8d42..1b1b9a5999 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
@@ -17,11 +17,13 @@
 package org.apache.dubbo.rpc.protocol.dubbo;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.Version;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
+import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient;
 import org.apache.dubbo.remoting.transport.ClientDelegate;
 import org.apache.dubbo.rpc.AppResponse;
@@ -38,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PAYLOAD;
 import static org.apache.dubbo.remoting.Constants.SENT_KEY;
 import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
 import static 
org.apache.dubbo.rpc.protocol.dubbo.Constants.CALLBACK_SERVICE_KEY;
@@ -60,18 +63,28 @@ class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     protected Result doInvoke(Invocation invocation) throws Throwable {
         RpcInvocation inv = (RpcInvocation) invocation;
         // use interface's name as service path to export if it's not found on 
client side
         inv.setAttachment(PATH_KEY, getInterface().getName());
         inv.setAttachment(CALLBACK_SERVICE_KEY, serviceKey);
 
+        Integer payload = getUrl().getParameter(PAYLOAD, Integer.class);
+
+        Request request = new Request();
+        if (payload != null) {
+            request.setPayload(payload);
+        }
+        request.setData(inv);
+        request.setVersion(Version.getProtocolVersion());
+
         try {
             if (RpcUtils.isOneway(getUrl(), inv)) { // may have concurrency 
issue
-                currentClient.send(inv, 
getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false));
+                currentClient.send(request, 
getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false));
                 return AsyncRpcResult.newDefaultAsyncResult(invocation);
             } else {
-                CompletableFuture<AppResponse> appResponseFuture = 
currentClient.request(inv).thenApply(obj -> (AppResponse) obj);
+                CompletableFuture<AppResponse> appResponseFuture = 
currentClient.request(request).thenApply(AppResponse.class::cast);
                 return new AsyncRpcResult(appResponseFuture, inv);
             }
         } catch (RpcException e) {
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 8204d3b033..91d0976ac8 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -17,12 +17,14 @@
 package org.apache.dubbo.rpc.protocol.dubbo;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.utils.AtomicPositiveInteger;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
+import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.FutureContext;
@@ -45,6 +47,7 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PAYLOAD;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_CLIENT;
 import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
 
@@ -99,14 +102,26 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
             }
 
             invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));
+
+            Integer payload = getUrl().getParameter(PAYLOAD, Integer.class);
+
+            Request request = new Request();
+            if (payload != null) {
+                request.setPayload(payload);
+            }
+            request.setData(inv);
+            request.setVersion(Version.getProtocolVersion());
+
             if (isOneway) {
                 boolean isSent = getUrl().getMethodParameter(methodName, 
Constants.SENT_KEY, false);
-                currentClient.send(inv, isSent);
+                request.setTwoWay(false);
+                currentClient.send(request, isSent);
                 return AsyncRpcResult.newDefaultAsyncResult(invocation);
             } else {
+                request.setTwoWay(true);
                 ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                 CompletableFuture<AppResponse> appResponseFuture =
-                    currentClient.request(inv, timeout, 
executor).thenApply(obj -> (AppResponse) obj);
+                    currentClient.request(request, timeout, 
executor).thenApply(AppResponse.class::cast);
                 // save for 2.6.x compatibility, for example, TraceFilter in 
Zipkin uses com.alibaba.xxx.FutureAdapter
                 
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                 AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, 
inv);

Reply via email to