This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 935849f4ce Optimize the decoding of generic http2 (#14175)
935849f4ce is described below
commit 935849f4ce0eed1893869711fe42b84d22e31c84
Author: TomlongTK <[email protected]>
AuthorDate: Thu May 23 11:17:31 2024 +0800
Optimize the decoding of generic http2 (#14175)
* Optimize the decoding of generic http2
* Decode on close
* Clean up netty residual memory when stream is closed
---
...ngDecoder.java => DefaultStreamingDecoder.java} | 37 +++++++++++++++++++---
.../h12/grpc/GrpcHttp2ServerTransportListener.java | 14 --------
.../http2/GenericHttp2ServerTransportListener.java | 18 +++++++++--
3 files changed, 47 insertions(+), 22 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java
similarity index 59%
rename from
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java
rename to
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java
index 473e6a2ed0..939bbefc74 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java
@@ -16,13 +16,19 @@
*/
package org.apache.dubbo.remoting.http12.message;
+import org.apache.dubbo.remoting.http12.CompositeInputStream;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
+import java.io.IOException;
import java.io.InputStream;
-public class NoOpStreamingDecoder implements StreamingDecoder {
+public class DefaultStreamingDecoder implements StreamingDecoder {
- private FragmentListener listener;
+ private boolean closed;
+
+ protected final CompositeInputStream accumulate = new
CompositeInputStream();
+
+ protected FragmentListener listener;
@Override
public void request(int numMessages) {
@@ -31,17 +37,38 @@ public class NoOpStreamingDecoder implements
StreamingDecoder {
@Override
public void decode(InputStream inputStream) throws DecodeException {
- listener.onFragmentMessage(inputStream);
+ if (closed) {
+ // ignored
+ return;
+ }
+ accumulate.addInputStream(inputStream);
}
@Override
public void close() {
- this.listener.onClose();
+ try {
+ if (!closed) {
+ closed = true;
+ listener.onFragmentMessage(accumulate);
+ accumulate.close();
+ listener.onClose();
+ }
+ } catch (IOException e) {
+ throw new DecodeException(e);
+ }
}
@Override
public void onStreamClosed() {
- // do nothing
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ accumulate.close();
+ } catch (IOException e) {
+ throw new DecodeException(e);
+ }
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 0399c64334..9ae7881302 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -26,7 +26,6 @@ import
org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
-import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
@@ -123,19 +122,6 @@ public class GrpcHttp2ServerTransportListener extends
GenericHttp2ServerTranspor
return invocation;
}
- @Override
- protected void onError(Http2InputMessage message, Throwable throwable) {
- try {
- message.close();
- } catch (Exception e) {
- throwable.addSuppressed(e);
- }
- onError(throwable);
- }
-
- @Override
- protected void onFinally(Http2InputMessage message) {}
-
@Override
protected GrpcStreamingDecoder getStreamingDecoder() {
return (GrpcStreamingDecoder) super.getStreamingDecoder();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 5be8bfcf68..9db2971837 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -28,9 +28,9 @@ import
org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
+import org.apache.dubbo.remoting.http12.message.DefaultStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.ListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
-import org.apache.dubbo.remoting.http12.message.NoOpStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
import org.apache.dubbo.rpc.CancellationContext;
@@ -77,8 +77,7 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
}
protected StreamingDecoder newStreamingDecoder() {
- // default no op
- return new NoOpStreamingDecoder();
+ return new DefaultStreamingDecoder();
}
@Override
@@ -174,6 +173,19 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
serverChannelObserver.onError(throwable);
}
+ @Override
+ protected void onError(Http2InputMessage message, Throwable throwable) {
+ try {
+ message.close();
+ } catch (Exception e) {
+ throwable.addSuppressed(e);
+ }
+ onError(throwable);
+ }
+
+ @Override
+ protected void onFinally(Http2InputMessage message) {}
+
@Override
public void cancelByRemote(long errorCode) {
serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));