This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new 077da34  Add serialize id check for 2.6 (#7912)
077da34 is described below

commit 077da34addab54d9718710f6848669c4f0d701f1
Author: Albumen Kevin <[email protected]>
AuthorDate: Sat May 29 22:58:22 2021 +0800

    Add serialize id check for 2.6 (#7912)
---
 .../java/com/alibaba/dubbo/common/Constants.java   |  9 +++
 .../com/alibaba/dubbo/config/ServiceConfig.java    |  2 +
 .../alibaba/dubbo/remoting/exchange/Response.java  | 13 ++++
 .../remoting/exchange/codec/ExchangeCodec.java     | 40 +++++------
 .../dubbo/remoting/transport/AbstractCodec.java    | 10 +++
 .../dubbo/remoting/transport/CodecSupport.java     | 77 +++++++++++++++++++---
 .../dubbo/remoting/codec/ExchangeCodecTest.java    |  4 +-
 dubbo-rpc/dubbo-rpc-api/pom.xml                    |  5 ++
 .../java/com/alibaba/dubbo/rpc/Invocation.java     |  5 ++
 .../java/com/alibaba/dubbo/rpc/RpcInvocation.java  | 19 +++++-
 .../dubbo/rpc/protocol/AbstractInvoker.java        |  9 +++
 .../alibaba/dubbo/rpc/support/MockInvocation.java  | 14 ++++
 .../protocol/dubbo/DecodeableRpcInvocation.java    | 13 +++-
 .../rpc/protocol/dubbo/DecodeableRpcResult.java    | 15 +++++
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       | 36 ++++++++--
 .../rpc/protocol/dubbo/DubboCodecSupport.java      | 50 ++++++++++++++
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     |  2 +-
 17 files changed, 286 insertions(+), 37 deletions(-)

diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java 
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java
index 9365a4d..82bcb77 100644
--- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/Constants.java
@@ -664,4 +664,13 @@ public class Constants {
     public static final String ENABLE_NATIVE_JAVA_GENERIC_SERIALIZE = 
"dubbo.security.serialize.generic.native-java-enable";
 
     public static final String SERIALIZE_BLOCKED_LIST_FILE_PATH = 
"security/serialize.blockedlist";
+
+    public static final String DEFAULT_VERSION = "0.0.0";
+
+    public static final String SERIALIZATION_SECURITY_CHECK_KEY = 
"serialization.security.check";
+
+    public static final String SERIALIZATION_ID_KEY = "serialization_id";
+
+    public static final String INVOCATION_KEY = "invocation";
+
 }
diff --git 
a/dubbo-config/dubbo-config-api/src/main/java/com/alibaba/dubbo/config/ServiceConfig.java
 
b/dubbo-config/dubbo-config-api/src/main/java/com/alibaba/dubbo/config/ServiceConfig.java
index 705ef53..a680324 100644
--- 
a/dubbo-config/dubbo-config-api/src/main/java/com/alibaba/dubbo/config/ServiceConfig.java
+++ 
b/dubbo-config/dubbo-config-api/src/main/java/com/alibaba/dubbo/config/ServiceConfig.java
@@ -30,6 +30,7 @@ import 
com.alibaba.dubbo.config.invoker.DelegateProviderMetaDataInvoker;
 import com.alibaba.dubbo.config.model.ApplicationModel;
 import com.alibaba.dubbo.config.model.ProviderModel;
 import com.alibaba.dubbo.config.support.Parameter;
+import com.alibaba.dubbo.remoting.transport.CodecSupport;
 import com.alibaba.dubbo.rpc.Exporter;
 import com.alibaba.dubbo.rpc.Invoker;
 import com.alibaba.dubbo.rpc.Protocol;
@@ -317,6 +318,7 @@ public class ServiceConfig<T> extends AbstractServiceConfig 
{
             path = interfaceName;
         }
         doExportUrls();
+        CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), 
getExportedUrls());
         ProviderModel providerModel = new 
ProviderModel(getUniqueServiceName(), this, ref);
         ApplicationModel.initProviderModel(getUniqueServiceName(), 
providerModel);
     }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/Response.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/Response.java
index d01d20d..0b4950a 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/Response.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/Response.java
@@ -16,6 +16,9 @@
  */
 package com.alibaba.dubbo.remoting.exchange;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Response
  */
@@ -92,6 +95,8 @@ public class Response {
 
     private Object mResult;
 
+    private Map<String, Object> attributes = new HashMap<String, Object>(2);
+
     public Response() {
     }
 
@@ -164,6 +169,14 @@ public class Response {
         mErrorMsg = msg;
     }
 
+    public Object getAttribute(String key) {
+        return attributes.get(key);
+    }
+
+    public void setAttribute(String key, Object value) {
+        attributes.put(key, value);
+    }
+
     @Override
     public String toString() {
         return "Response [id=" + mId + ", version=" + mVersion + ", status=" + 
mStatus + ", event=" + mEvent
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
index f1e2bb8..81127ac 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -38,6 +38,7 @@ import com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec;
 import com.alibaba.dubbo.remoting.transport.CodecSupport;
 import com.alibaba.dubbo.remoting.transport.ExceedPayloadLimitException;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -155,9 +156,12 @@ public class ExchangeCodec extends TelnetCodec {
                 if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
-                        data = decodeHeartbeatData(channel, in);
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        data = decodeHeartbeatData(channel, 
CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                     } else if (res.isEvent()) {
-                        data = decodeEventData(channel, in);
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        data = decodeEventData(channel,
+                                CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                     } else {
                         data = decodeResponseData(channel, in, 
getRequestData(id));
                     }
@@ -182,9 +186,13 @@ public class ExchangeCodec extends TelnetCodec {
                 ObjectInput in = CodecSupport.deserialize(channel.getUrl(), 
is, proto);
                 Object data;
                 if (req.isHeartbeat()) {
-                    data = decodeHeartbeatData(channel, in);
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    data = decodeHeartbeatData(channel,
+                            CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                 } else if (req.isEvent()) {
-                    data = decodeEventData(channel, in);
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    data = decodeEventData(channel,
+                            CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                 } else {
                     data = decodeRequestData(channel, in);
                 }
@@ -340,15 +348,6 @@ public class ExchangeCodec extends TelnetCodec {
         return decodeRequestData(in);
     }
 
-    @Deprecated
-    protected Object decodeHeartbeatData(ObjectInput in) throws IOException {
-        try {
-            return in.readObject();
-        } catch (ClassNotFoundException e) {
-            throw new IOException(StringUtils.toString("Read object failed.", 
e));
-        }
-    }
-
     protected Object decodeRequestData(ObjectInput in) throws IOException {
         try {
             return in.readObject();
@@ -392,8 +391,13 @@ public class ExchangeCodec extends TelnetCodec {
         return decodeRequestData(channel, in);
     }
 
-    protected Object decodeEventData(Channel channel, ObjectInput in) throws 
IOException {
+    protected Object decodeEventData(Channel channel, ObjectInput in, byte[] 
eventPayload) throws IOException {
         try {
+            int dataLen = eventPayload.length;
+            int threshold = 
Integer.parseInt(System.getProperty("deserialization.event.size", "50"));
+            if (dataLen > threshold) {
+                throw new IllegalArgumentException("Event data too long, 
actual size " + dataLen + ", threshold " + threshold + " rejected for security 
consideration.");
+            }
             return in.readObject();
         } catch (ClassNotFoundException e) {
             throw new IOException(StringUtils.toString("Read object failed.", 
e));
@@ -401,12 +405,8 @@ public class ExchangeCodec extends TelnetCodec {
     }
 
     @Deprecated
-    protected Object decodeHeartbeatData(Channel channel, ObjectInput in) 
throws IOException {
-        try {
-            return in.readObject();
-        } catch (ClassNotFoundException e) {
-            throw new IOException(StringUtils.toString("Read object failed.", 
e));
-        }
+    protected Object decodeHeartbeatData(Channel channel, ObjectInput in, 
byte[] eventPayload) throws IOException {
+        return decodeEventData(channel, in, eventPayload);
     }
 
     protected Object decodeRequestData(Channel channel, ObjectInput in) throws 
IOException {
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java
index 76cb8ab..01fee21 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java
@@ -24,6 +24,8 @@ import com.alibaba.dubbo.common.serialize.Serialization;
 import com.alibaba.dubbo.common.utils.NetUtils;
 import com.alibaba.dubbo.remoting.Channel;
 import com.alibaba.dubbo.remoting.Codec2;
+import com.alibaba.dubbo.remoting.exchange.Request;
+import com.alibaba.dubbo.remoting.exchange.Response;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -51,6 +53,14 @@ public abstract class AbstractCodec implements Codec2 {
         return CodecSupport.getSerialization(channel.getUrl());
     }
 
+    protected Serialization getSerialization(Channel channel, Request req) {
+        return CodecSupport.getSerialization(channel.getUrl());
+    }
+
+    protected Serialization getSerialization(Channel channel, Response res) {
+        return CodecSupport.getSerialization(channel.getUrl());
+    }
+
     protected boolean isClientSide(Channel channel) {
         String side = (String) channel.getAttribute(Constants.SIDE_KEY);
         if ("client".equals(side)) {
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
index 3262e12..f8a241a 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java
@@ -24,18 +24,29 @@ import com.alibaba.dubbo.common.logger.Logger;
 import com.alibaba.dubbo.common.logger.LoggerFactory;
 import com.alibaba.dubbo.common.serialize.ObjectInput;
 import com.alibaba.dubbo.common.serialize.Serialization;
+import com.alibaba.dubbo.common.utils.CollectionUtils;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.alibaba.dubbo.common.Constants.SERIALIZATION_KEY;
 
 public class CodecSupport {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CodecSupport.class);
     private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new 
HashMap<Byte, Serialization>();
     private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new 
HashMap<Byte, String>();
+    private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new 
HashMap<String, Byte>();
+
+    private static Map<String, Set<Byte>> PROVIDER_SUPPORTED_SERIALIZATION = 
new ConcurrentHashMap<String, Set<Byte>>();
 
     static {
         Set<String> supportedExtensions = 
ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
@@ -51,6 +62,7 @@ public class CodecSupport {
             }
             ID_SERIALIZATION_MAP.put(idByte, serialization);
             ID_SERIALIZATIONNAME_MAP.put(idByte, name);
+            SERIALIZATIONNAME_ID_MAP.put(name, idByte);
         }
     }
 
@@ -63,18 +75,15 @@ public class CodecSupport {
 
     public static Serialization getSerialization(URL url) {
         return 
ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
-                url.getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
+                url.getParameter(SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
     }
 
     public static Serialization getSerialization(URL url, Byte id) throws 
IOException {
-        Serialization serialization = getSerializationById(id);
-        String serializationName = 
url.getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION);
-        // Check if "serialization id" passed from network matches the id on 
this side(only take effect for JDK serialization), for security purpose.
-        if (serialization == null
-                || ((id == 3 || id == 7 || id == 4) && 
!(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
-            throw new IOException("Unexpected serialization id:" + id + " 
received from network, please check if the peer send the right id.");
+        Serialization result = getSerializationById(id);
+        if (result == null) {
+            throw new IOException("Unrecognized serialize type from consumer: 
" + id);
         }
-        return serialization;
+        return result;
     }
 
     public static ObjectInput deserialize(URL url, InputStream is, byte proto) 
throws IOException {
@@ -82,4 +91,56 @@ public class CodecSupport {
         return s.deserialize(url, is);
     }
 
+    /**
+     * Read all payload to byte[]
+     *
+     * @param is
+     * @return
+     * @throws IOException
+     */
+    public static byte[] getPayload(InputStream is) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int len;
+        while ((len = is.read(buffer)) > -1) {
+            baos.write(buffer, 0, len);
+        }
+        baos.flush();
+        return baos.toByteArray();
+    }
+
+    public static Byte getIDByName(String name) {
+        return SERIALIZATIONNAME_ID_MAP.get(name);
+    }
+
+    public static void checkSerialization(String path, String version, Byte 
id) throws IOException {
+        Set<Byte> supportedSerialization = 
PROVIDER_SUPPORTED_SERIALIZATION.get(path + ":" + version);
+        if (Constants.DEFAULT_VERSION.equals(version) && 
CollectionUtils.isEmpty(supportedSerialization)) {
+            supportedSerialization = 
PROVIDER_SUPPORTED_SERIALIZATION.get(path);
+        }
+        if (CollectionUtils.isEmpty(supportedSerialization)) {
+            if (logger.isWarnEnabled()) {
+                logger.warn("Serialization security check is enabled but 
cannot work as expected because " +
+                        "there's no matched provider model for path " + path + 
", version " + version);
+            }
+        } else {
+            if (!supportedSerialization.contains(id)) {
+                throw new IOException("Unexpected serialization id:" + id + " 
received from network, please check if the peer send the right id.");
+            }
+        }
+    }
+
+    public static void addProviderSupportedSerialization(String serviceName, 
List<URL> exportedUrls) {
+        if (CollectionUtils.isNotEmpty(exportedUrls)) {
+            Set<Byte> supportedSerialization = new HashSet<Byte>();
+            for (URL url : exportedUrls) {
+                String serializationName = url.getParameter(SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION);
+                Byte localId = SERIALIZATIONNAME_ID_MAP.get(serializationName);
+                supportedSerialization.add(localId);
+            }
+            PROVIDER_SUPPORTED_SERIALIZATION.put(serviceName, 
Collections.unmodifiableSet(supportedSerialization));
+        }
+    }
+
+
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
index 163c72c..a35f830 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -230,12 +230,14 @@ public class ExchangeCodecTest extends TelnetCodecTest {
         Person person = new Person();
         byte[] request = getRequestBytes(person, header);
 
+        System.setProperty("deserialization.event.size", "100");
         Request obj = (Request) decode(request);
         Assert.assertEquals(person, obj.getData());
         Assert.assertEquals(true, obj.isTwoWay());
         Assert.assertEquals(true, obj.isEvent());
         Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
         System.out.println(obj);
+        System.clearProperty("deserialization.event.size");
     }
 
     @Test
@@ -269,7 +271,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
     @Test
     public void test_Decode_Return_Request_Object() throws IOException {
         //|10011111|20-stats=ok|id=0|length=0
-        byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0xe2, 20, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+        byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0xc2, 20, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
         byte[] request = getRequestBytes(person, header);
 
diff --git a/dubbo-rpc/dubbo-rpc-api/pom.xml b/dubbo-rpc/dubbo-rpc-api/pom.xml
index dac691b..df74757 100644
--- a/dubbo-rpc/dubbo-rpc-api/pom.xml
+++ b/dubbo-rpc/dubbo-rpc-api/pom.xml
@@ -39,5 +39,10 @@
             <artifactId>dubbo-serialization-api</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>dubbo-remoting-api</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/Invocation.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/Invocation.java
index e5a0220..8c9dcb5 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/Invocation.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/Invocation.java
@@ -83,4 +83,9 @@ public interface Invocation {
      */
     Invoker<?> getInvoker();
 
+    Object put(Object key, Object value);
+
+    Object get(Object key);
+
+    Map<Object, Object> getAttributes();
 }
\ No newline at end of file
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java
index 1458131..8c9556a 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java
@@ -44,6 +44,8 @@ public class RpcInvocation implements Invocation, 
Serializable {
 
     private transient Invoker<?> invoker;
 
+    private Map<Object, Object> attributes = new HashMap<Object, Object>(2);
+
     public RpcInvocation() {
     }
 
@@ -205,10 +207,25 @@ public class RpcInvocation implements Invocation, 
Serializable {
     }
 
     @Override
+    public Object put(Object key, Object value) {
+        return attributes.put(key, value);
+    }
+
+    @Override
+    public Object get(Object key) {
+        return attributes.get(key);
+    }
+
+    @Override
+    public Map<Object, Object> getAttributes() {
+        return attributes;
+    }
+
+    @Override
     public String toString() {
         return "RpcInvocation [methodName=" + methodName + ", parameterTypes="
                 + Arrays.toString(parameterTypes) + ", arguments=" + 
Arrays.toString(arguments)
-                + ", attachments=" + attachments + "]";
+                + ", attachments=" + attachments + ", attributes=" + 
attributes + "]";
     }
 
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
index 4ab44b7..4b02f5f 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
@@ -22,6 +22,7 @@ import com.alibaba.dubbo.common.Version;
 import com.alibaba.dubbo.common.logger.Logger;
 import com.alibaba.dubbo.common.logger.LoggerFactory;
 import com.alibaba.dubbo.common.utils.NetUtils;
+import com.alibaba.dubbo.remoting.transport.CodecSupport;
 import com.alibaba.dubbo.rpc.Invocation;
 import com.alibaba.dubbo.rpc.Invoker;
 import com.alibaba.dubbo.rpc.Result;
@@ -37,6 +38,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
com.alibaba.dubbo.common.Constants.DEFAULT_REMOTING_SERIALIZATION;
+import static com.alibaba.dubbo.common.Constants.SERIALIZATION_ID_KEY;
+import static com.alibaba.dubbo.common.Constants.SERIALIZATION_KEY;
+
 /**
  * AbstractInvoker.
  */
@@ -150,6 +155,10 @@ public abstract class AbstractInvoker<T> implements 
Invoker<T> {
         }
         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
 
+        Byte serializationId = 
CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, 
DEFAULT_REMOTING_SERIALIZATION));
+        if (serializationId != null) {
+            invocation.put(SERIALIZATION_ID_KEY, serializationId);
+        }
 
         try {
             return doInvoke(invocation);
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/com/alibaba/dubbo/rpc/support/MockInvocation.java
 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/com/alibaba/dubbo/rpc/support/MockInvocation.java
index 4d29091..e1bfdfe 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/com/alibaba/dubbo/rpc/support/MockInvocation.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/com/alibaba/dubbo/rpc/support/MockInvocation.java
@@ -63,4 +63,18 @@ public class MockInvocation implements Invocation {
         return getAttachments().get(key);
     }
 
+    @Override
+    public Object put(Object key, Object value) {
+        return null;
+    }
+
+    @Override
+    public Object get(Object key) {
+        return null;
+    }
+
+    @Override
+    public Map<Object, Object> getAttributes() {
+        return null;
+    }
 }
\ No newline at end of file
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index 4002f04..a106dd8 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -37,6 +37,8 @@ import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
+import static com.alibaba.dubbo.common.Constants.SERIALIZATION_ID_KEY;
+import static 
com.alibaba.dubbo.common.Constants.SERIALIZATION_SECURITY_CHECK_KEY;
 import static 
com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.decodeInvocationArgument;
 
 public class DecodeableRpcInvocation extends RpcInvocation implements Codec, 
Decodeable {
@@ -89,16 +91,23 @@ public class DecodeableRpcInvocation extends RpcInvocation 
implements Codec, Dec
     public Object decode(Channel channel, InputStream input) throws 
IOException {
         ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), 
serializationType)
                 .deserialize(channel.getUrl(), input);
+        this.put(SERIALIZATION_ID_KEY, serializationType);
 
         String dubboVersion = in.readUTF();
         request.setVersion(dubboVersion);
         setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);
 
-        setAttachment(Constants.PATH_KEY, in.readUTF());
-        setAttachment(Constants.VERSION_KEY, in.readUTF());
+        String path = in.readUTF();
+        setAttachment(Constants.PATH_KEY, path);
+        String version = in.readUTF();
+        setAttachment(Constants.VERSION_KEY, version);
 
         setMethodName(in.readUTF());
         try {
+            if 
(Boolean.parseBoolean(System.getProperty(SERIALIZATION_SECURITY_CHECK_KEY, 
"false"))) {
+                CodecSupport.checkSerialization(path, version, 
serializationType);
+            }
+
             Object[] args;
             Class<?>[] pts;
             String desc = in.readUTF();
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index e43080a..e24eb37 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -37,6 +37,9 @@ import java.io.OutputStream;
 import java.lang.reflect.Type;
 import java.util.Map;
 
+import static com.alibaba.dubbo.common.Constants.SERIALIZATION_ID_KEY;
+import static 
com.alibaba.dubbo.common.Constants.SERIALIZATION_SECURITY_CHECK_KEY;
+
 public class DecodeableRpcResult extends RpcResult implements Codec, 
Decodeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(DecodeableRpcResult.class);
@@ -140,6 +143,15 @@ public class DecodeableRpcResult extends RpcResult 
implements Codec, Decodeable
     public void decode() throws Exception {
         if (!hasDecoded && channel != null && inputStream != null) {
             try {
+                if 
(Boolean.parseBoolean(System.getProperty(SERIALIZATION_SECURITY_CHECK_KEY, 
"false")) && invocation != null) {
+                    Object serializationType_obj = 
invocation.get(SERIALIZATION_ID_KEY);
+                    if (serializationType_obj != null) {
+                        if (((Byte) 
serializationType_obj).compareTo(serializationType) != 0) {
+                            throw new IOException("Unexpected serialization 
id:" + serializationType + " received from network, please check if the peer 
send the right id.");
+                        }
+                    }
+                }
+
                 decode(channel, inputStream);
             } catch (Throwable e) {
                 if (log.isWarnEnabled()) {
@@ -153,4 +165,7 @@ public class DecodeableRpcResult extends RpcResult 
implements Codec, Decodeable
         }
     }
 
+    public Invocation getInvocation() {
+        return invocation;
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 3c12448..35b95d9 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -23,6 +23,7 @@ import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
 import com.alibaba.dubbo.common.logger.Logger;
 import com.alibaba.dubbo.common.logger.LoggerFactory;
 import com.alibaba.dubbo.common.serialize.ObjectOutput;
+import com.alibaba.dubbo.common.serialize.Serialization;
 import com.alibaba.dubbo.common.utils.ReflectUtils;
 import com.alibaba.dubbo.common.utils.StringUtils;
 import com.alibaba.dubbo.remoting.Channel;
@@ -35,6 +36,7 @@ import com.alibaba.dubbo.rpc.Invocation;
 import com.alibaba.dubbo.rpc.Result;
 import com.alibaba.dubbo.rpc.RpcInvocation;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -75,9 +77,13 @@ public class DubboCodec extends ExchangeCodec implements 
Codec2 {
                 if (status == Response.OK) {
                     Object data;
                     if (res.isHeartbeat()) {
-                        data = decodeHeartbeatData(channel,  
CodecSupport.deserialize(channel.getUrl(), is, proto));
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        data = decodeHeartbeatData(channel,
+                                CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                     } else if (res.isEvent()) {
-                        data = decodeEventData(channel,  
CodecSupport.deserialize(channel.getUrl(), is, proto));
+                        byte[] eventPayload = CodecSupport.getPayload(is);
+                        data = decodeEventData(channel,
+                                CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                     } else {
                         DecodeableRpcResult result;
                         if (channel.getUrl().getParameter(
@@ -116,9 +122,13 @@ public class DubboCodec extends ExchangeCodec implements 
Codec2 {
             try {
                 Object data;
                 if (req.isHeartbeat()) {
-                    data = decodeHeartbeatData(channel, 
CodecSupport.deserialize(channel.getUrl(), is, proto));
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    data = decodeHeartbeatData(channel,
+                            CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                 } else if (req.isEvent()) {
-                    data = decodeEventData(channel, 
CodecSupport.deserialize(channel.getUrl(), is, proto));
+                    byte[] eventPayload = CodecSupport.getPayload(is);
+                    data = decodeEventData(channel,
+                            CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                 } else {
                     DecodeableRpcInvocation inv;
                     if (channel.getUrl().getParameter(
@@ -207,4 +217,22 @@ public class DubboCodec extends ExchangeCodec implements 
Codec2 {
             out.writeObject(result.getAttachments());
         }
     }
+
+    @Override
+    protected Serialization getSerialization(Channel channel, Request req) {
+        if (!(req.getData() instanceof Invocation)) {
+            return super.getSerialization(channel, req);
+        }
+        return DubboCodecSupport.getRequestSerialization(channel.getUrl(), 
(Invocation) req.getData());
+    }
+
+    @Override
+    protected Serialization getSerialization(Channel channel, Response res) {
+        if (!(res.getResult() instanceof DecodeableRpcResult)) {
+            return super.getSerialization(channel, res);
+        }
+        return DubboCodecSupport.getResponseSerialization(channel.getUrl(), 
(DecodeableRpcResult) res.getResult());
+    }
+
+
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java
new file mode 100644
index 0000000..c89ac48
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.dubbo.rpc.protocol.dubbo;
+
+import com.alibaba.dubbo.common.Constants;
+import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.extension.ExtensionLoader;
+import com.alibaba.dubbo.common.serialize.Serialization;
+import com.alibaba.dubbo.remoting.transport.CodecSupport;
+import com.alibaba.dubbo.rpc.Invocation;
+
+import static com.alibaba.dubbo.common.Constants.SERIALIZATION_ID_KEY;
+
+public class DubboCodecSupport {
+    public static Serialization getRequestSerialization(URL url, Invocation 
invocation) {
+        Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
+        if (serializationType_obj != null) {
+            return CodecSupport.getSerializationById((Byte) 
serializationType_obj);
+        }
+        return 
ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
+                url.getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
+    }
+
+    public static Serialization getResponseSerialization(URL url, 
DecodeableRpcResult result) {
+        Invocation invocation = result.getInvocation();
+        if (invocation != null) {
+            Object serializationType_obj = 
invocation.get(SERIALIZATION_ID_KEY);
+            if (serializationType_obj != null) {
+                return CodecSupport.getSerializationById((Byte) 
serializationType_obj);
+            }
+        }
+        return 
ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
+                url.getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
+    }
+
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboInvoker.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 0c6ea37..2bdbd40 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -60,7 +60,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
         super(serviceType, url, new String[]{Constants.INTERFACE_KEY, 
Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
         this.clients = clients;
         // get version.
-        this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
+        this.version = url.getParameter(Constants.VERSION_KEY, 
Constants.DEFAULT_VERSION);
         this.invokers = invokers;
     }
 

Reply via email to