This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 638f37538f Restores PackableMethod serialization mode (#14323)
638f37538f is described below
commit 638f37538f074c4eba80311b6619a89f6b9ffd93
Author: TomlongTK <[email protected]>
AuthorDate: Tue Jul 23 11:27:15 2024 +0800
Restores PackableMethod serialization mode (#14323)
* Restore the serialization mode of PackableMethod
* Add custom function when Setting Method Descriptor
* Format code
---------
Co-authored-by: earthchen <[email protected]>
---
.../http12/h2/Http2ServerChannelObserver.java | 4 +-
.../message/LengthFieldStreamingDecoder.java | 12 +-
.../remoting/http12/message/StreamingDecoder.java | 7 -
.../dubbo/rpc/protocol/tri/DescriptorUtils.java | 7 +-
.../tri/h12/AbstractServerTransportListener.java | 3 +
.../protocol/tri/h12/grpc/GrpcCompositeCodec.java | 119 ++++++++--------
.../tri/h12/grpc/GrpcCompositeCodecFactory.java | 7 +-
.../h12/grpc/GrpcHttp2ServerTransportListener.java | 39 ++----
.../tri/h12/grpc/ProtobufHttpMessageCodec.java | 56 --------
.../tri/h12/grpc/WrapperHttpMessageCodec.java | 154 ---------------------
.../http2/GenericHttp2ServerTransportListener.java | 4 +-
11 files changed, 89 insertions(+), 323 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index bd43f2ad92..c3c8f8c9a8 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -79,7 +79,9 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
closed();
}
}
- this.cancellationContext.cancel(throwable);
+ if (cancellationContext != null) {
+ cancellationContext.cancel(throwable);
+ }
long errorCode = 0;
if (throwable instanceof ErrorCodeHolder) {
errorCode = ((ErrorCodeHolder) throwable).getErrorCode();
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index 5272a701be..77821ea431 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -16,12 +16,10 @@
*/
package org.apache.dubbo.remoting.http12.message;
-import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.CompositeInputStream;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -47,8 +45,6 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
private int requiredLength;
- private InputStream dataHeader = StreamUtils.EMPTY;
-
public LengthFieldStreamingDecoder() {
this(4);
}
@@ -147,16 +143,12 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
}
private void processHeader() throws IOException {
- ByteArrayOutputStream bos = new
ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength);
byte[] offsetData = new byte[lengthFieldOffset];
int ignore = accumulate.read(offsetData);
- bos.write(offsetData);
processOffset(new ByteArrayInputStream(offsetData), lengthFieldOffset);
byte[] lengthBytes = new byte[lengthFieldLength];
ignore = accumulate.read(lengthBytes);
- bos.write(lengthBytes);
requiredLength = bytesToInt(lengthBytes);
- this.dataHeader = new ByteArrayInputStream(bos.toByteArray());
// Continue reading the frame body.
state = DecodeState.PAYLOAD;
@@ -184,8 +176,8 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
requiredLength = lengthFieldOffset + lengthFieldLength;
}
- protected void invokeListener(InputStream inputStream) {
- this.listener.onFragmentMessage(dataHeader, inputStream);
+ public void invokeListener(InputStream inputStream) {
+ this.listener.onFragmentMessage(inputStream);
}
protected byte[] readRawMessage(InputStream inputStream, int length)
throws IOException {
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
index 5372eea9c0..c271ef212f 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java
@@ -39,13 +39,6 @@ public interface StreamingDecoder {
*/
void onFragmentMessage(InputStream rawMessage);
- /**
- * @param rawMessage raw message
- */
- default void onFragmentMessage(InputStream dataHeader, InputStream
rawMessage) {
- onFragmentMessage(rawMessage);
- }
-
default void onClose() {}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
index 64f25fff4c..c80eb21b66 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.rpc.Invoker;
@@ -28,6 +29,8 @@ import
org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper.TripleRequ
import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
import org.apache.dubbo.rpc.stub.StubSuppliers;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
@@ -124,9 +127,10 @@ public final class DescriptorUtils {
}
public static MethodDescriptor findTripleMethodDescriptor(
- ServiceDescriptor serviceDescriptor, String methodName, byte[]
data) {
+ ServiceDescriptor serviceDescriptor, String methodName,
InputStream rawMessage) throws IOException {
MethodDescriptor methodDescriptor =
findReflectionMethodDescriptor(serviceDescriptor, methodName);
if (methodDescriptor == null) {
+ byte[] data = StreamUtils.readBytes(rawMessage);
List<MethodDescriptor> methodDescriptors =
serviceDescriptor.getMethods(methodName);
TripleRequestWrapper request =
TripleRequestWrapper.parseFrom(data);
String[] paramTypes = request.getArgTypes().toArray(new String[0]);
@@ -141,6 +145,7 @@ public final class DescriptorUtils {
if (methodDescriptor == null) {
throw new UnimplementedException("method:" + methodName);
}
+ rawMessage.reset();
}
return methodDescriptor;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
index d856dfad94..1060ea2c0c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
@@ -204,6 +204,7 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
methodDescriptor = DescriptorUtils.findMethodDescriptor(
context.getServiceDescriptor(), context.getMethodName(),
context.isHasStub());
context.setMethodDescriptor(methodDescriptor);
+ onSettingMethodDescriptor(methodDescriptor);
}
MethodMetadata methodMetadata = context.getMethodMetadata();
if (methodMetadata == null) {
@@ -280,4 +281,6 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
protected void setHttpMessageListener(HttpMessageListener
httpMessageListener) {
this.httpMessageListener = httpMessageListener;
}
+
+ protected void onSettingMethodDescriptor(MethodDescriptor
methodDescriptor) {}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index df18100379..aef98805c1 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -16,38 +16,62 @@
*/
package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.PackableMethod;
+import org.apache.dubbo.rpc.model.PackableMethodFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-import com.google.protobuf.Message;
-
-import static
org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
public class GrpcCompositeCodec implements HttpMessageCodec {
- private final ProtobufHttpMessageCodec protobufHttpMessageCodec;
+ private static final String PACKABLE_METHOD_CACHE =
"PACKABLE_METHOD_CACHE";
- private final WrapperHttpMessageCodec wrapperHttpMessageCodec;
+ private final URL url;
- public GrpcCompositeCodec(
- ProtobufHttpMessageCodec protobufHttpMessageCodec,
WrapperHttpMessageCodec wrapperHttpMessageCodec) {
- this.protobufHttpMessageCodec = protobufHttpMessageCodec;
- this.wrapperHttpMessageCodec = wrapperHttpMessageCodec;
- }
+ private final FrameworkModel frameworkModel;
+
+ private final String mediaType;
- public void setEncodeTypes(Class<?>[] encodeTypes) {
- this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes);
+ private PackableMethod packableMethod;
+
+ public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String
mediaType) {
+ this.url = url;
+ this.frameworkModel = frameworkModel;
+ this.mediaType = mediaType;
}
- public void setDecodeTypes(Class<?>[] decodeTypes) {
- this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes);
+ public void loadPackableMethod(MethodDescriptor methodDescriptor) {
+ if (methodDescriptor instanceof PackableMethod) {
+ packableMethod = (PackableMethod) methodDescriptor;
+ return;
+ }
+ Map<MethodDescriptor, PackableMethod> cacheMap =
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
+ .getServiceMetadata()
+ .getAttributeMap()
+ .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new
ConcurrentHashMap<>());
+ packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md ->
frameworkModel
+ .getExtensionLoader(PackableMethodFactory.class)
+
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
+ .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
+ .create(methodDescriptor, url, mediaType));
}
@Override
@@ -58,34 +82,38 @@ public class GrpcCompositeCodec implements HttpMessageCodec
{
try {
int compressed = 0;
outputStream.write(compressed);
- if (isProtobuf(data)) {
- ProtobufWriter.write(protobufHttpMessageCodec, outputStream,
data);
- return;
- }
- // wrapper
- wrapperHttpMessageCodec.encode(outputStream, data);
- } catch (IOException e) {
+ byte[] bytes = packableMethod.packResponse(data);
+ writeLength(outputStream, bytes.length);
+ outputStream.write(bytes);
+ } catch (HttpStatusException e) {
+ throw e;
+ } catch (Exception e) {
throw new EncodeException(e);
}
}
@Override
public Object decode(InputStream inputStream, Class<?> targetType, Charset
charset) throws DecodeException {
- if (isProtoClass(targetType)) {
- return protobufHttpMessageCodec.decode(inputStream, targetType,
charset);
+ try {
+ byte[] data = StreamUtils.readBytes(inputStream);
+ return packableMethod.parseRequest(data);
+ } catch (HttpStatusException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DecodeException(e);
}
- return wrapperHttpMessageCodec.decode(inputStream, targetType,
charset);
}
@Override
public Object[] decode(InputStream inputStream, Class<?>[] targetTypes,
Charset charset) throws DecodeException {
- if (targetTypes.length > 1) {
- return wrapperHttpMessageCodec.decode(inputStream, targetTypes,
charset);
+ Object message = decode(inputStream, ArrayUtils.isEmpty(targetTypes) ?
null : targetTypes[0], charset);
+ if (message instanceof Object[]) {
+ return (Object[]) message;
}
- return HttpMessageCodec.super.decode(inputStream, targetTypes,
charset);
+ return new Object[] {message};
}
- private static void writeLength(OutputStream outputStream, int length) {
+ private void writeLength(OutputStream outputStream, int length) {
try {
outputStream.write(((length >> 24) & 0xFF));
outputStream.write(((length >> 16) & 0xFF));
@@ -100,39 +128,4 @@ public class GrpcCompositeCodec implements
HttpMessageCodec {
public MediaType mediaType() {
return MediaType.APPLICATION_GRPC;
}
-
- private static boolean isProtobuf(Object data) {
- if (data == null) {
- return false;
- }
- return isProtoClass(data.getClass());
- }
-
- private static boolean isProtoClass(Class<?> clazz) {
- while (clazz != Object.class && clazz != null) {
- Class<?>[] interfaces = clazz.getInterfaces();
- if (interfaces.length > 0) {
- for (Class<?> clazzInterface : interfaces) {
- if
(PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
- return true;
- }
- }
- }
- clazz = clazz.getSuperclass();
- }
- return false;
- }
-
- /**
- * lazy init protobuf class
- */
- private static class ProtobufWriter {
-
- private static void write(HttpMessageCodec codec, OutputStream
outputStream, Object data) {
- int serializedSize = ((Message) data).getSerializedSize();
- // write length
- writeLength(outputStream, serializedSize);
- codec.encode(outputStream, data);
- }
- }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
index 553c661cc0..1fa6842b7d 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java
@@ -22,7 +22,6 @@ import
org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.HttpMessageDecoderFactory;
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoderFactory;
import org.apache.dubbo.remoting.http12.message.MediaType;
-import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
@Activate
@@ -30,11 +29,7 @@ public class GrpcCompositeCodecFactory implements
HttpMessageEncoderFactory, Htt
@Override
public HttpMessageCodec createCodec(URL url, FrameworkModel
frameworkModel, String mediaType) {
- String serializeName = UrlUtils.serializationOrDefault(url);
- WrapperHttpMessageCodec wrapperHttpMessageCodec = new
WrapperHttpMessageCodec(url, frameworkModel);
- wrapperHttpMessageCodec.setSerializeType(serializeName);
- ProtobufHttpMessageCodec protobufHttpMessageCodec = new
ProtobufHttpMessageCodec();
- return new GrpcCompositeCodec(protobufHttpMessageCodec,
wrapperHttpMessageCodec);
+ return new GrpcCompositeCodec(url, frameworkModel, mediaType);
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 9a7a4828c5..3aa1bc0ac7 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.http12.HttpHeaders;
@@ -28,11 +27,11 @@ import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
-import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.protocol.tri.DescriptorUtils;
import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
@@ -40,8 +39,6 @@ import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener;
import
org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListener;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -138,6 +135,14 @@ public class GrpcHttp2ServerTransportListener extends
GenericHttp2ServerTranspor
return (GrpcStreamingDecoder) super.getStreamingDecoder();
}
+ @Override
+ protected void onSettingMethodDescriptor(MethodDescriptor
methodDescriptor) {
+ GrpcCompositeCodec grpcCompositeCodec =
+ (GrpcCompositeCodec) getContext().getHttpMessageDecoder();
+ grpcCompositeCodec.loadPackableMethod(methodDescriptor);
+ super.onSettingMethodDescriptor(methodDescriptor);
+ }
+
private class LazyFindMethodListener implements HttpMessageListener {
private final StreamingDecoder streamingDecoder;
@@ -156,39 +161,25 @@ public class GrpcHttp2ServerTransportListener extends
GenericHttp2ServerTranspor
private class DetermineMethodDescriptorListener implements
StreamingDecoder.FragmentListener {
- @Override
- public void onFragmentMessage(InputStream rawMessage) {}
-
@Override
public void onClose() {
getStreamingDecoder().close();
}
@Override
- public void onFragmentMessage(InputStream dataHeader, InputStream
rawMessage) {
+ public void onFragmentMessage(InputStream rawMessage) {
try {
- ByteArrayOutputStream merged =
- new ByteArrayOutputStream(dataHeader.available() +
rawMessage.available());
- StreamUtils.copy(dataHeader, merged);
- byte[] data = StreamUtils.readBytes(rawMessage);
-
RpcInvocationBuildContext context = getContext();
if (null == context.getMethodDescriptor()) {
-
context.setMethodDescriptor(DescriptorUtils.findTripleMethodDescriptor(
- context.getServiceDescriptor(),
context.getMethodName(), data));
+ MethodDescriptor methodDescriptor =
DescriptorUtils.findTripleMethodDescriptor(
+ context.getServiceDescriptor(),
context.getMethodName(), rawMessage);
+ context.setMethodDescriptor(methodDescriptor);
+ onSettingMethodDescriptor(methodDescriptor);
setHttpMessageListener(GrpcHttp2ServerTransportListener.super.buildHttpMessageListener());
-
- // replace decoder
- GrpcCompositeCodec grpcCompositeCodec =
(GrpcCompositeCodec) context.getHttpMessageDecoder();
- MethodMetadata methodMetadata =
context.getMethodMetadata();
-
grpcCompositeCodec.setDecodeTypes(methodMetadata.getActualRequestTypes());
- grpcCompositeCodec.setEncodeTypes(new Class[]
{methodMetadata.getActualResponseType()});
-
getServerChannelObserver().setResponseEncoder(grpcCompositeCodec);
}
- merged.write(data);
- getHttpMessageListener().onMessage(new
ByteArrayInputStream(merged.toByteArray()));
+ getStreamingDecoder().invokeListener(rawMessage);
} catch (IOException e) {
throw new DecodeException(e);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java
deleted file mode 100644
index 81de958115..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
-
-import org.apache.dubbo.remoting.http12.exception.DecodeException;
-import org.apache.dubbo.remoting.http12.exception.EncodeException;
-import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
-import org.apache.dubbo.remoting.http12.message.MediaType;
-import org.apache.dubbo.rpc.protocol.tri.SingleProtobufUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-
-public class ProtobufHttpMessageCodec implements HttpMessageCodec {
-
- private static final MediaType MEDIA_TYPE = new MediaType("application",
"x-protobuf");
-
- @Override
- public void encode(OutputStream outputStream, Object data, Charset
charset) throws EncodeException {
- try {
- SingleProtobufUtils.serialize(data, outputStream);
- } catch (IOException e) {
- throw new EncodeException(e);
- }
- }
-
- @Override
- public Object decode(InputStream inputStream, Class<?> targetType, Charset
charset) throws DecodeException {
- try {
- return SingleProtobufUtils.deserialize(inputStream, targetType);
- } catch (IOException e) {
- throw new DecodeException(e);
- }
- }
-
- @Override
- public MediaType mediaType() {
- return MEDIA_TYPE;
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java
deleted file mode 100644
index f7993c1ba8..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.serialize.MultipleSerialization;
-import org.apache.dubbo.config.Constants;
-import org.apache.dubbo.remoting.http12.exception.DecodeException;
-import org.apache.dubbo.remoting.http12.exception.EncodeException;
-import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
-import org.apache.dubbo.remoting.http12.message.MediaType;
-import org.apache.dubbo.remoting.transport.CodecSupport;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
-import org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-
-public class WrapperHttpMessageCodec implements HttpMessageCodec {
-
- private static final MediaType MEDIA_TYPE = new MediaType("application",
"triple+wrapper");
-
- private static final String DEFAULT_SERIALIZE_TYPE = "fastjson2";
-
- private final MultipleSerialization serialization;
-
- private final URL url;
-
- private Class<?>[] encodeTypes;
-
- private Class<?>[] decodeTypes;
-
- private String serializeType = DEFAULT_SERIALIZE_TYPE;
-
- public WrapperHttpMessageCodec(URL url, FrameworkModel frameworkModel) {
- this.url = url;
- this.serialization = frameworkModel
- .getExtensionLoader(MultipleSerialization.class)
-
.getExtension(url.getParameter(Constants.MULTI_SERIALIZATION_KEY,
CommonConstants.DEFAULT_KEY));
- }
-
- public void setSerializeType(String serializeType) {
- this.serializeType = serializeType;
- }
-
- public void setEncodeTypes(Class<?>[] encodeTypes) {
- this.encodeTypes = encodeTypes;
- }
-
- public void setDecodeTypes(Class<?>[] decodeTypes) {
- this.decodeTypes = decodeTypes;
- }
-
- @Override
- public void encode(OutputStream outputStream, Object data, Charset
charset) throws EncodeException {
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- serialization.serialize(url, serializeType, encodeTypes[0], data,
bos);
- byte[] encoded =
TripleCustomerProtocolWapper.TripleResponseWrapper.Builder.newBuilder()
- .setSerializeType(serializeType)
- .setType(encodeTypes[0].getName())
- .setData(bos.toByteArray())
- .build()
- .toByteArray();
- writeLength(outputStream, encoded.length);
- outputStream.write(encoded);
- } catch (IOException e) {
- throw new EncodeException(e);
- }
- }
-
- @Override
- public void encode(OutputStream outputStream, Object[] data, Charset
charset) throws EncodeException {
- // TODO
- }
-
- @Override
- public Object decode(InputStream inputStream, Class<?> targetType, Charset
charset) throws DecodeException {
- Object[] decode = this.decode(inputStream, new Class[] {targetType},
charset);
- if (decode == null || decode.length == 0) {
- return null;
- }
- return decode[0];
- }
-
- @Override
- public Object[] decode(InputStream inputStream, Class<?>[] targetTypes,
Charset charset) throws DecodeException {
- try {
- int len;
- byte[] data = new byte[4096];
- ByteArrayOutputStream bos = new ByteArrayOutputStream(4096);
- while ((len = inputStream.read(data)) != -1) {
- bos.write(data, 0, len);
- }
- TripleCustomerProtocolWapper.TripleRequestWrapper wrapper =
-
TripleCustomerProtocolWapper.TripleRequestWrapper.parseFrom(bos.toByteArray());
- final String serializeType =
convertHessianFromWrapper(wrapper.getSerializeType());
- CodecSupport.checkSerialization(serializeType, url);
- setSerializeType(wrapper.getSerializeType());
- Object[] ret = new Object[wrapper.getArgs().size()];
- for (int i = 0; i < wrapper.getArgs().size(); i++) {
- ByteArrayInputStream in =
- new ByteArrayInputStream(wrapper.getArgs().get(i));
- try {
- ret[i] = this.serialization.deserialize(url,
wrapper.getSerializeType(), targetTypes[i], in);
- } catch (ClassNotFoundException e) {
- throw new DecodeException(e);
- }
- }
- return ret;
- } catch (IOException e) {
- throw new DecodeException(e);
- }
- }
-
- @Override
- public MediaType mediaType() {
- return MEDIA_TYPE;
- }
-
- private static void writeLength(OutputStream outputStream, int length)
throws IOException {
- outputStream.write(((length >> 24) & 0xFF));
- outputStream.write(((length >> 16) & 0xFF));
- outputStream.write(((length >> 8) & 0xFF));
- outputStream.write((length & 0xFF));
- }
-
- private static String convertHessianFromWrapper(String serializeType) {
- if (TripleConstant.HESSIAN4.equals(serializeType)) {
- return TripleConstant.HESSIAN2;
- }
- return serializeType;
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 382df659de..16e44fd033 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -205,7 +205,9 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
@Override
public void cancelByRemote(long errorCode) {
serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));
- serverCallListener.onCancel(errorCode);
+ if (serverCallListener != null) {
+ serverCallListener.onCancel(errorCode);
+ }
}
protected StreamingDecoder getStreamingDecoder() {