This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 638f37538f Restores PackableMethod serialization mode (#14323)
638f37538f is described below

commit 638f37538f074c4eba80311b6619a89f6b9ffd93
Author: TomlongTK <[email protected]>
AuthorDate: Tue Jul 23 11:27:15 2024 +0800

    Restores PackableMethod serialization mode (#14323)
    
    * Restore the serialization mode of PackableMethod
    
    * Add custom function when Setting Method Descriptor
    
    * Format code
    
    ---------
    
    Co-authored-by: earthchen <[email protected]>
---
 .../http12/h2/Http2ServerChannelObserver.java      |   4 +-
 .../message/LengthFieldStreamingDecoder.java       |  12 +-
 .../remoting/http12/message/StreamingDecoder.java  |   7 -
 .../dubbo/rpc/protocol/tri/DescriptorUtils.java    |   7 +-
 .../tri/h12/AbstractServerTransportListener.java   |   3 +
 .../protocol/tri/h12/grpc/GrpcCompositeCodec.java  | 119 ++++++++--------
 .../tri/h12/grpc/GrpcCompositeCodecFactory.java    |   7 +-
 .../h12/grpc/GrpcHttp2ServerTransportListener.java |  39 ++----
 .../tri/h12/grpc/ProtobufHttpMessageCodec.java     |  56 --------
 .../tri/h12/grpc/WrapperHttpMessageCodec.java      | 154 ---------------------
 .../http2/GenericHttp2ServerTransportListener.java |   4 +-
 11 files changed, 89 insertions(+), 323 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index bd43f2ad92..c3c8f8c9a8 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -79,7 +79,9 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
                 closed();
             }
         }
-        this.cancellationContext.cancel(throwable);
+        if (cancellationContext != null) {
+            cancellationContext.cancel(throwable);
+        }
         long errorCode = 0;
         if (throwable instanceof ErrorCodeHolder) {
             errorCode = ((ErrorCodeHolder) throwable).getErrorCode();
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index 5272a701be..77821ea431 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -16,12 +16,10 @@
  */
 package org.apache.dubbo.remoting.http12.message;
 
-import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.remoting.http12.CompositeInputStream;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -47,8 +45,6 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
 
     private int requiredLength;
 
-    private InputStream dataHeader = StreamUtils.EMPTY;
-
     public LengthFieldStreamingDecoder() {
         this(4);
     }
@@ -147,16 +143,12 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
     }
 
     private void processHeader() throws IOException {
-        ByteArrayOutputStream bos = new 
ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength);
         byte[] offsetData = new byte[lengthFieldOffset];
         int ignore = accumulate.read(offsetData);
-        bos.write(offsetData);
         processOffset(new ByteArrayInputStream(offsetData), lengthFieldOffset);
         byte[] lengthBytes = new byte[lengthFieldLength];
         ignore = accumulate.read(lengthBytes);
-        bos.write(lengthBytes);
         requiredLength = bytesToInt(lengthBytes);
-        this.dataHeader = new ByteArrayInputStream(bos.toByteArray());
 
         // Continue reading the frame body.
         state = DecodeState.PAYLOAD;
@@ -184,8 +176,8 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
         requiredLength = lengthFieldOffset + lengthFieldLength;
     }
 
-    protected void invokeListener(InputStream inputStream) {
-        this.listener.onFragmentMessage(dataHeader, inputStream);
+    public void invokeListener(InputStream inputStream) {
+        this.listener.onFragmentMessage(inputStream);
     }
 
     protected byte[] readRawMessage(InputStream inputStream, int length) 
throws IOException {
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
index 5372eea9c0..c271ef212f 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
@@ -39,13 +39,6 @@ public interface StreamingDecoder {
          */
         void onFragmentMessage(InputStream rawMessage);
 
-        /**
-         * @param rawMessage raw message
-         */
-        default void onFragmentMessage(InputStream dataHeader, InputStream 
rawMessage) {
-            onFragmentMessage(rawMessage);
-        }
-
         default void onClose() {}
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
index 64f25fff4c..c80eb21b66 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
 import org.apache.dubbo.rpc.Invoker;
@@ -28,6 +29,8 @@ import 
org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper.TripleRequ
 import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
 import org.apache.dubbo.rpc.stub.StubSuppliers;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.List;
 
@@ -124,9 +127,10 @@ public final class DescriptorUtils {
     }
 
     public static MethodDescriptor findTripleMethodDescriptor(
-            ServiceDescriptor serviceDescriptor, String methodName, byte[] 
data) {
+            ServiceDescriptor serviceDescriptor, String methodName, 
InputStream rawMessage) throws IOException {
         MethodDescriptor methodDescriptor = 
findReflectionMethodDescriptor(serviceDescriptor, methodName);
         if (methodDescriptor == null) {
+            byte[] data = StreamUtils.readBytes(rawMessage);
             List<MethodDescriptor> methodDescriptors = 
serviceDescriptor.getMethods(methodName);
             TripleRequestWrapper request = 
TripleRequestWrapper.parseFrom(data);
             String[] paramTypes = request.getArgTypes().toArray(new String[0]);
@@ -141,6 +145,7 @@ public final class DescriptorUtils {
             if (methodDescriptor == null) {
                 throw new UnimplementedException("method:" + methodName);
             }
+            rawMessage.reset();
         }
         return methodDescriptor;
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
index d856dfad94..1060ea2c0c 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
@@ -204,6 +204,7 @@ public abstract class 
AbstractServerTransportListener<HEADER extends RequestMeta
             methodDescriptor = DescriptorUtils.findMethodDescriptor(
                     context.getServiceDescriptor(), context.getMethodName(), 
context.isHasStub());
             context.setMethodDescriptor(methodDescriptor);
+            onSettingMethodDescriptor(methodDescriptor);
         }
         MethodMetadata methodMetadata = context.getMethodMetadata();
         if (methodMetadata == null) {
@@ -280,4 +281,6 @@ public abstract class 
AbstractServerTransportListener<HEADER extends RequestMeta
     protected void setHttpMessageListener(HttpMessageListener 
httpMessageListener) {
         this.httpMessageListener = httpMessageListener;
     }
+
+    protected void onSettingMethodDescriptor(MethodDescriptor 
methodDescriptor) {}
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index df18100379..aef98805c1 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -16,38 +16,62 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
 
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.PackableMethod;
+import org.apache.dubbo.rpc.model.PackableMethodFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-import com.google.protobuf.Message;
-
-import static 
org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
+import static 
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
 
 public class GrpcCompositeCodec implements HttpMessageCodec {
 
-    private final ProtobufHttpMessageCodec protobufHttpMessageCodec;
+    private static final String PACKABLE_METHOD_CACHE = 
"PACKABLE_METHOD_CACHE";
 
-    private final WrapperHttpMessageCodec wrapperHttpMessageCodec;
+    private final URL url;
 
-    public GrpcCompositeCodec(
-            ProtobufHttpMessageCodec protobufHttpMessageCodec, 
WrapperHttpMessageCodec wrapperHttpMessageCodec) {
-        this.protobufHttpMessageCodec = protobufHttpMessageCodec;
-        this.wrapperHttpMessageCodec = wrapperHttpMessageCodec;
-    }
+    private final FrameworkModel frameworkModel;
+
+    private final String mediaType;
 
-    public void setEncodeTypes(Class<?>[] encodeTypes) {
-        this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes);
+    private PackableMethod packableMethod;
+
+    public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String 
mediaType) {
+        this.url = url;
+        this.frameworkModel = frameworkModel;
+        this.mediaType = mediaType;
     }
 
-    public void setDecodeTypes(Class<?>[] decodeTypes) {
-        this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes);
+    public void loadPackableMethod(MethodDescriptor methodDescriptor) {
+        if (methodDescriptor instanceof PackableMethod) {
+            packableMethod = (PackableMethod) methodDescriptor;
+            return;
+        }
+        Map<MethodDescriptor, PackableMethod> cacheMap = 
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
+                .getServiceMetadata()
+                .getAttributeMap()
+                .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new 
ConcurrentHashMap<>());
+        packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md -> 
frameworkModel
+                .getExtensionLoader(PackableMethodFactory.class)
+                
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
+                        .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
+                .create(methodDescriptor, url, mediaType));
     }
 
     @Override
@@ -58,34 +82,38 @@ public class GrpcCompositeCodec implements HttpMessageCodec 
{
         try {
             int compressed = 0;
             outputStream.write(compressed);
-            if (isProtobuf(data)) {
-                ProtobufWriter.write(protobufHttpMessageCodec, outputStream, 
data);
-                return;
-            }
-            // wrapper
-            wrapperHttpMessageCodec.encode(outputStream, data);
-        } catch (IOException e) {
+            byte[] bytes = packableMethod.packResponse(data);
+            writeLength(outputStream, bytes.length);
+            outputStream.write(bytes);
+        } catch (HttpStatusException e) {
+            throw e;
+        } catch (Exception e) {
             throw new EncodeException(e);
         }
     }
 
     @Override
     public Object decode(InputStream inputStream, Class<?> targetType, Charset 
charset) throws DecodeException {
-        if (isProtoClass(targetType)) {
-            return protobufHttpMessageCodec.decode(inputStream, targetType, 
charset);
+        try {
+            byte[] data = StreamUtils.readBytes(inputStream);
+            return packableMethod.parseRequest(data);
+        } catch (HttpStatusException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new DecodeException(e);
         }
-        return wrapperHttpMessageCodec.decode(inputStream, targetType, 
charset);
     }
 
     @Override
     public Object[] decode(InputStream inputStream, Class<?>[] targetTypes, 
Charset charset) throws DecodeException {
-        if (targetTypes.length > 1) {
-            return wrapperHttpMessageCodec.decode(inputStream, targetTypes, 
charset);
+        Object message = decode(inputStream, ArrayUtils.isEmpty(targetTypes) ? 
null : targetTypes[0], charset);
+        if (message instanceof Object[]) {
+            return (Object[]) message;
         }
-        return HttpMessageCodec.super.decode(inputStream, targetTypes, 
charset);
+        return new Object[] {message};
     }
 
-    private static void writeLength(OutputStream outputStream, int length) {
+    private void writeLength(OutputStream outputStream, int length) {
         try {
             outputStream.write(((length >> 24) & 0xFF));
             outputStream.write(((length >> 16) & 0xFF));
@@ -100,39 +128,4 @@ public class GrpcCompositeCodec implements 
HttpMessageCodec {
     public MediaType mediaType() {
         return MediaType.APPLICATION_GRPC;
     }
-
-    private static boolean isProtobuf(Object data) {
-        if (data == null) {
-            return false;
-        }
-        return isProtoClass(data.getClass());
-    }
-
-    private static boolean isProtoClass(Class<?> clazz) {
-        while (clazz != Object.class && clazz != null) {
-            Class<?>[] interfaces = clazz.getInterfaces();
-            if (interfaces.length > 0) {
-                for (Class<?> clazzInterface : interfaces) {
-                    if 
(PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
-                        return true;
-                    }
-                }
-            }
-            clazz = clazz.getSuperclass();
-        }
-        return false;
-    }
-
-    /**
-     * lazy init protobuf class
-     */
-    private static class ProtobufWriter {
-
-        private static void write(HttpMessageCodec codec, OutputStream 
outputStream, Object data) {
-            int serializedSize = ((Message) data).getSerializedSize();
-            // write length
-            writeLength(outputStream, serializedSize);
-            codec.encode(outputStream, data);
-        }
-    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
index 553c661cc0..1fa6842b7d 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
@@ -22,7 +22,6 @@ import 
org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.HttpMessageDecoderFactory;
 import org.apache.dubbo.remoting.http12.message.HttpMessageEncoderFactory;
 import org.apache.dubbo.remoting.http12.message.MediaType;
-import org.apache.dubbo.remoting.utils.UrlUtils;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 
 @Activate
@@ -30,11 +29,7 @@ public class GrpcCompositeCodecFactory implements 
HttpMessageEncoderFactory, Htt
 
     @Override
     public HttpMessageCodec createCodec(URL url, FrameworkModel 
frameworkModel, String mediaType) {
-        String serializeName = UrlUtils.serializationOrDefault(url);
-        WrapperHttpMessageCodec wrapperHttpMessageCodec = new 
WrapperHttpMessageCodec(url, frameworkModel);
-        wrapperHttpMessageCodec.setSerializeType(serializeName);
-        ProtobufHttpMessageCodec protobufHttpMessageCodec = new 
ProtobufHttpMessageCodec();
-        return new GrpcCompositeCodec(protobufHttpMessageCodec, 
wrapperHttpMessageCodec);
+        return new GrpcCompositeCodec(url, frameworkModel, mediaType);
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 9a7a4828c5..3aa1bc0ac7 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.remoting.http12.HttpHeaders;
@@ -28,11 +27,11 @@ import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.h2.Http2Header;
 import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
 import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
-import org.apache.dubbo.remoting.http12.message.MethodMetadata;
 import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.protocol.tri.DescriptorUtils;
 import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext;
 import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
@@ -40,8 +39,6 @@ import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
 import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener;
 import 
org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListener;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -138,6 +135,14 @@ public class GrpcHttp2ServerTransportListener extends 
GenericHttp2ServerTranspor
         return (GrpcStreamingDecoder) super.getStreamingDecoder();
     }
 
+    @Override
+    protected void onSettingMethodDescriptor(MethodDescriptor 
methodDescriptor) {
+        GrpcCompositeCodec grpcCompositeCodec =
+                (GrpcCompositeCodec) getContext().getHttpMessageDecoder();
+        grpcCompositeCodec.loadPackableMethod(methodDescriptor);
+        super.onSettingMethodDescriptor(methodDescriptor);
+    }
+
     private class LazyFindMethodListener implements HttpMessageListener {
 
         private final StreamingDecoder streamingDecoder;
@@ -156,39 +161,25 @@ public class GrpcHttp2ServerTransportListener extends 
GenericHttp2ServerTranspor
 
     private class DetermineMethodDescriptorListener implements 
StreamingDecoder.FragmentListener {
 
-        @Override
-        public void onFragmentMessage(InputStream rawMessage) {}
-
         @Override
         public void onClose() {
             getStreamingDecoder().close();
         }
 
         @Override
-        public void onFragmentMessage(InputStream dataHeader, InputStream 
rawMessage) {
+        public void onFragmentMessage(InputStream rawMessage) {
             try {
-                ByteArrayOutputStream merged =
-                        new ByteArrayOutputStream(dataHeader.available() + 
rawMessage.available());
-                StreamUtils.copy(dataHeader, merged);
-                byte[] data = StreamUtils.readBytes(rawMessage);
-
                 RpcInvocationBuildContext context = getContext();
                 if (null == context.getMethodDescriptor()) {
-                    
context.setMethodDescriptor(DescriptorUtils.findTripleMethodDescriptor(
-                            context.getServiceDescriptor(), 
context.getMethodName(), data));
+                    MethodDescriptor methodDescriptor = 
DescriptorUtils.findTripleMethodDescriptor(
+                            context.getServiceDescriptor(), 
context.getMethodName(), rawMessage);
+                    context.setMethodDescriptor(methodDescriptor);
+                    onSettingMethodDescriptor(methodDescriptor);
 
                     
setHttpMessageListener(GrpcHttp2ServerTransportListener.super.buildHttpMessageListener());
-
-                    // replace decoder
-                    GrpcCompositeCodec grpcCompositeCodec = 
(GrpcCompositeCodec) context.getHttpMessageDecoder();
-                    MethodMetadata methodMetadata = 
context.getMethodMetadata();
-                    
grpcCompositeCodec.setDecodeTypes(methodMetadata.getActualRequestTypes());
-                    grpcCompositeCodec.setEncodeTypes(new Class[] 
{methodMetadata.getActualResponseType()});
-                    
getServerChannelObserver().setResponseEncoder(grpcCompositeCodec);
                 }
 
-                merged.write(data);
-                getHttpMessageListener().onMessage(new 
ByteArrayInputStream(merged.toByteArray()));
+                getStreamingDecoder().invokeListener(rawMessage);
             } catch (IOException e) {
                 throw new DecodeException(e);
             }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java
deleted file mode 100644
index 81de958115..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
-
-import org.apache.dubbo.remoting.http12.exception.DecodeException;
-import org.apache.dubbo.remoting.http12.exception.EncodeException;
-import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
-import org.apache.dubbo.remoting.http12.message.MediaType;
-import org.apache.dubbo.rpc.protocol.tri.SingleProtobufUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-
-public class ProtobufHttpMessageCodec implements HttpMessageCodec {
-
-    private static final MediaType MEDIA_TYPE = new MediaType("application", 
"x-protobuf");
-
-    @Override
-    public void encode(OutputStream outputStream, Object data, Charset 
charset) throws EncodeException {
-        try {
-            SingleProtobufUtils.serialize(data, outputStream);
-        } catch (IOException e) {
-            throw new EncodeException(e);
-        }
-    }
-
-    @Override
-    public Object decode(InputStream inputStream, Class<?> targetType, Charset 
charset) throws DecodeException {
-        try {
-            return SingleProtobufUtils.deserialize(inputStream, targetType);
-        } catch (IOException e) {
-            throw new DecodeException(e);
-        }
-    }
-
-    @Override
-    public MediaType mediaType() {
-        return MEDIA_TYPE;
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java
deleted file mode 100644
index f7993c1ba8..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.serialize.MultipleSerialization;
-import org.apache.dubbo.config.Constants;
-import org.apache.dubbo.remoting.http12.exception.DecodeException;
-import org.apache.dubbo.remoting.http12.exception.EncodeException;
-import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
-import org.apache.dubbo.remoting.http12.message.MediaType;
-import org.apache.dubbo.remoting.transport.CodecSupport;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
-import org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-
-public class WrapperHttpMessageCodec implements HttpMessageCodec {
-
-    private static final MediaType MEDIA_TYPE = new MediaType("application", 
"triple+wrapper");
-
-    private static final String DEFAULT_SERIALIZE_TYPE = "fastjson2";
-
-    private final MultipleSerialization serialization;
-
-    private final URL url;
-
-    private Class<?>[] encodeTypes;
-
-    private Class<?>[] decodeTypes;
-
-    private String serializeType = DEFAULT_SERIALIZE_TYPE;
-
-    public WrapperHttpMessageCodec(URL url, FrameworkModel frameworkModel) {
-        this.url = url;
-        this.serialization = frameworkModel
-                .getExtensionLoader(MultipleSerialization.class)
-                
.getExtension(url.getParameter(Constants.MULTI_SERIALIZATION_KEY, 
CommonConstants.DEFAULT_KEY));
-    }
-
-    public void setSerializeType(String serializeType) {
-        this.serializeType = serializeType;
-    }
-
-    public void setEncodeTypes(Class<?>[] encodeTypes) {
-        this.encodeTypes = encodeTypes;
-    }
-
-    public void setDecodeTypes(Class<?>[] decodeTypes) {
-        this.decodeTypes = decodeTypes;
-    }
-
-    @Override
-    public void encode(OutputStream outputStream, Object data, Charset 
charset) throws EncodeException {
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            serialization.serialize(url, serializeType, encodeTypes[0], data, 
bos);
-            byte[] encoded = 
TripleCustomerProtocolWapper.TripleResponseWrapper.Builder.newBuilder()
-                    .setSerializeType(serializeType)
-                    .setType(encodeTypes[0].getName())
-                    .setData(bos.toByteArray())
-                    .build()
-                    .toByteArray();
-            writeLength(outputStream, encoded.length);
-            outputStream.write(encoded);
-        } catch (IOException e) {
-            throw new EncodeException(e);
-        }
-    }
-
-    @Override
-    public void encode(OutputStream outputStream, Object[] data, Charset 
charset) throws EncodeException {
-        // TODO
-    }
-
-    @Override
-    public Object decode(InputStream inputStream, Class<?> targetType, Charset 
charset) throws DecodeException {
-        Object[] decode = this.decode(inputStream, new Class[] {targetType}, 
charset);
-        if (decode == null || decode.length == 0) {
-            return null;
-        }
-        return decode[0];
-    }
-
-    @Override
-    public Object[] decode(InputStream inputStream, Class<?>[] targetTypes, 
Charset charset) throws DecodeException {
-        try {
-            int len;
-            byte[] data = new byte[4096];
-            ByteArrayOutputStream bos = new ByteArrayOutputStream(4096);
-            while ((len = inputStream.read(data)) != -1) {
-                bos.write(data, 0, len);
-            }
-            TripleCustomerProtocolWapper.TripleRequestWrapper wrapper =
-                    
TripleCustomerProtocolWapper.TripleRequestWrapper.parseFrom(bos.toByteArray());
-            final String serializeType = 
convertHessianFromWrapper(wrapper.getSerializeType());
-            CodecSupport.checkSerialization(serializeType, url);
-            setSerializeType(wrapper.getSerializeType());
-            Object[] ret = new Object[wrapper.getArgs().size()];
-            for (int i = 0; i < wrapper.getArgs().size(); i++) {
-                ByteArrayInputStream in =
-                        new ByteArrayInputStream(wrapper.getArgs().get(i));
-                try {
-                    ret[i] = this.serialization.deserialize(url, 
wrapper.getSerializeType(), targetTypes[i], in);
-                } catch (ClassNotFoundException e) {
-                    throw new DecodeException(e);
-                }
-            }
-            return ret;
-        } catch (IOException e) {
-            throw new DecodeException(e);
-        }
-    }
-
-    @Override
-    public MediaType mediaType() {
-        return MEDIA_TYPE;
-    }
-
-    private static void writeLength(OutputStream outputStream, int length) 
throws IOException {
-        outputStream.write(((length >> 24) & 0xFF));
-        outputStream.write(((length >> 16) & 0xFF));
-        outputStream.write(((length >> 8) & 0xFF));
-        outputStream.write((length & 0xFF));
-    }
-
-    private static String convertHessianFromWrapper(String serializeType) {
-        if (TripleConstant.HESSIAN4.equals(serializeType)) {
-            return TripleConstant.HESSIAN2;
-        }
-        return serializeType;
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 382df659de..16e44fd033 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -205,7 +205,9 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
     @Override
     public void cancelByRemote(long errorCode) {
         
serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));
-        serverCallListener.onCancel(errorCode);
+        if (serverCallListener != null) {
+            serverCallListener.onCancel(errorCode);
+        }
     }
 
     protected StreamingDecoder getStreamingDecoder() {

Reply via email to