This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 5d5d0365b5 Triple http limiting the size of the HTTP request and
response (#14246)
5d5d0365b5 is described below
commit 5d5d0365b54f842cda0952f3199e8924129f21d8
Author: TomlongTK <[email protected]>
AuthorDate: Tue Jun 11 19:08:41 2024 +0800
Triple http limiting the size of the HTTP request and response (#14246)
* Triple http limiting the size of the HTTP request and response
* Limit http1 and http2 response body
* Native http2 unary calls use Http2ServerUnaryChannelObserver, The flag
should also be set when the http1 connection is disconnected
* Format code
* Fix some problems
* Revert netty new api
* Code format
---------
Co-authored-by: earthchen <[email protected]>
---
.../http12/AbstractServerHttpChannelObserver.java | 35 +++++++++++--
.../dubbo/remoting/http12/HttpChannelObserver.java | 2 +-
.../remoting/http12/HttpTransportListener.java | 3 +-
.../apache/dubbo/remoting/http12/HttpUtils.java | 6 +--
.../http12/LimitedByteBufOutputStream.java | 58 ++++++++++++++++++++++
.../HttpOverPayloadException.java} | 8 +--
.../http12/h2/Http2ServerChannelObserver.java | 25 ++--------
.../remoting/http12/h2/Http2TransportListener.java | 5 +-
.../remoting/http12/message/codec/BinaryCodec.java | 3 ++
.../remoting/http12/message/codec/HtmlCodec.java | 3 ++
.../remoting/http12/message/codec/JsonCodec.java | 9 +++-
.../remoting/http12/message/codec/JsonPbCodec.java | 5 ++
.../http12/message/codec/PlainTextCodec.java | 3 ++
.../http12/message/codec/UrlEncodeFormCodec.java | 5 ++
.../remoting/http12/message/codec/XmlCodec.java | 5 ++
.../remoting/http12/message/codec/YamlCodec.java | 9 ++++
.../http12/netty4/h1/NettyHttp1Channel.java | 11 ++--
.../netty4/h1/NettyHttp1ConnectionHandler.java | 8 ++-
.../http12/netty4/h2/NettyH2StreamChannel.java | 10 +++-
.../http12/netty4/h2/NettyHttp2FrameCodec.java | 2 +-
.../h2/NettyHttp2ProtocolSelectorHandler.java | 9 +++-
.../dubbo/rpc/protocol/tri/TripleHeaderEnum.java | 3 +-
.../rpc/protocol/tri/TripleHttp2Protocol.java | 20 ++++----
.../h12/grpc/GrpcHttp2ServerTransportListener.java | 10 ++++
.../tri/h12/grpc/GrpcServerChannelObserver.java | 15 ++++--
.../DefaultHttp11ServerTransportListener.java | 5 ++
.../http2/GenericHttp2ServerTransportListener.java | 44 +++++++++++-----
.../h12/http2/Http2ServerUnaryChannelObserver.java | 52 +++++++++++++++++++
.../protocol/tri/rest/RestHttpMessageCodec.java | 3 ++
29 files changed, 303 insertions(+), 73 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index 26ce8b705e..fb9c54f79c 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -35,6 +35,10 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
private boolean headerSent;
+ private boolean completed;
+
+ private boolean closed;
+
protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
this.httpChannel = httpChannel;
}
@@ -69,6 +73,9 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
@Override
public final void onNext(Object data) {
+ if (closed) {
+ return;
+ }
try {
doOnNext(data);
} catch (Throwable e) {
@@ -85,9 +92,12 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
@Override
public final void onError(Throwable throwable) {
+ if (closed) {
+ return;
+ }
if (throwable instanceof HttpResultPayloadException) {
onNext(((HttpResultPayloadException) throwable).getResult());
- doOnCompleted(null);
+ onCompleted(null);
return;
}
try {
@@ -95,7 +105,7 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
} catch (Throwable ex) {
throwable = new EncodeException(ex);
} finally {
- doOnCompleted(throwable);
+ onCompleted(throwable);
}
}
@@ -110,7 +120,17 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
@Override
public final void onCompleted() {
- doOnCompleted(null);
+ if (closed) {
+ return;
+ }
+ onCompleted(null);
+ }
+
+ private void onCompleted(Throwable throwable) {
+ if (!completed) {
+ doOnCompleted(throwable);
+ completed = true;
+ }
}
protected void doOnCompleted(Throwable throwable) {
@@ -198,4 +218,13 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
getHttpChannel().writeHeader(httpMetadata);
headerSent = true;
}
+
+ @Override
+ public void close() throws Exception {
+ closed();
+ }
+
+ protected final void closed() {
+ closed = true;
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
index 4cb0342642..294d7311bf 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
@@ -18,7 +18,7 @@ package org.apache.dubbo.remoting.http12;
import org.apache.dubbo.common.stream.StreamObserver;
-public interface HttpChannelObserver<T> extends StreamObserver<T> {
+public interface HttpChannelObserver<T> extends StreamObserver<T>,
AutoCloseable {
HttpChannel getHttpChannel();
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
index 9265a3ba93..57e1419eb0 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
@@ -16,7 +16,8 @@
*/
package org.apache.dubbo.remoting.http12;
-public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE
extends HttpInputMessage> {
+public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE
extends HttpInputMessage>
+ extends AutoCloseable {
void onMetadata(HEADER metadata);
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
index a25c107669..23c53f8a3f 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
@@ -185,14 +185,14 @@ public final class HttpUtils {
}
public static HttpRequest.FileUpload readUpload(InterfaceHttpData item) {
- return new DefaultFileUploadAdaptee((FileUpload) item);
+ return new DefaultFileUploadAdapter((FileUpload) item);
}
- private static class DefaultFileUploadAdaptee implements
HttpRequest.FileUpload {
+ private static class DefaultFileUploadAdapter implements
HttpRequest.FileUpload {
private final FileUpload fu;
private InputStream inputStream;
- DefaultFileUploadAdaptee(FileUpload fu) {
+ DefaultFileUploadAdapter(FileUpload fu) {
this.fu = fu;
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java
new file mode 100644
index 0000000000..ac70f88f42
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12;
+
+import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException;
+
+import java.io.IOException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+
+public class LimitedByteBufOutputStream extends ByteBufOutputStream {
+
+ private final int capacity;
+
+ public LimitedByteBufOutputStream(ByteBuf byteBuf, int capacity) {
+ super(byteBuf);
+ this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureCapacity(1);
+ super.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ ensureCapacity(b.length);
+ super.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ ensureCapacity(len);
+ super.write(b, off, len);
+ }
+
+ private void ensureCapacity(int len) {
+ if (writtenBytes() + len > capacity) {
+ throw new HttpOverPayloadException("Response Entity Too Large");
+ }
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java
similarity index 79%
copy from
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
copy to
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java
index 09ad7fe422..c98d51ba4b 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java
@@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.http12.h2;
+package org.apache.dubbo.remoting.http12.exception;
-public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {
+public class HttpOverPayloadException extends HttpStatusException {
- void onStreamClosed();
+ public HttpOverPayloadException(String message) {
+ super(500, message);
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index 61568cc5bb..bd43f2ad92 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -37,8 +37,6 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
private boolean autoRequestN = true;
- private boolean closed = false;
-
public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
super(h2StreamChannel);
}
@@ -78,7 +76,7 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
public void cancel(Throwable throwable) {
if (throwable instanceof CancelStreamException) {
if (((CancelStreamException) throwable).isCancelByRemote()) {
- closed = true;
+ closed();
}
}
this.cancellationContext.cancel(throwable);
@@ -89,22 +87,6 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
getHttpChannel().writeResetFrame(errorCode);
}
- @Override
- public void doOnNext(Object data) throws Throwable {
- if (closed) {
- return;
- }
- super.doOnNext(data);
- }
-
- @Override
- public void doOnError(Throwable throwable) throws Throwable {
- if (closed) {
- return;
- }
- super.doOnError(throwable);
- }
-
@Override
public void request(int count) {
this.streamingDecoder.request(count);
@@ -120,8 +102,9 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
return autoRequestN;
}
- public void onStreamClosed() {
- closed = true;
+ @Override
+ public void close() throws Exception {
+ super.close();
streamingDecoder.onStreamClosed();
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
index 09ad7fe422..16531e7a93 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
@@ -16,7 +16,4 @@
*/
package org.apache.dubbo.remoting.http12.h2;
-public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {
-
- void onStreamClosed();
-}
+public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
index bb3ee1af5c..b939927764 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -49,6 +50,8 @@ public class BinaryCodec implements HttpMessageCodec {
public Object decode(InputStream is, Class<?> targetType, Charset charset)
throws DecodeException {
try {
return StreamUtils.readBytes(is);
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new DecodeException(e);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
index ac1ae805ab..ca3fff4f10 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -48,6 +49,8 @@ public class HtmlCodec implements HttpMessageCodec {
if (targetType == String.class) {
return StreamUtils.toString(is, charset);
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new DecodeException(e);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
index aea3fdb66a..c16dbe2b24 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -35,6 +36,8 @@ public class JsonCodec implements HttpMessageCodec {
public void encode(OutputStream os, Object data, Charset charset) throws
EncodeException {
try {
os.write(JsonUtils.toJson(data).getBytes(charset));
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable t) {
throw new EncodeException("Error encoding json", t);
}
@@ -43,6 +46,8 @@ public class JsonCodec implements HttpMessageCodec {
public void encode(OutputStream os, Object[] data, Charset charset) throws
EncodeException {
try {
os.write(JsonUtils.toJson(data).getBytes(charset));
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable t) {
throw new EncodeException("Error encoding json", t);
}
@@ -52,6 +57,8 @@ public class JsonCodec implements HttpMessageCodec {
public Object decode(InputStream is, Class<?> targetType, Charset charset)
throws DecodeException {
try {
return JsonUtils.toJavaObject(StreamUtils.toString(is, charset),
targetType);
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable t) {
throw new DecodeException("Error decoding json", t);
}
@@ -78,7 +85,7 @@ public class JsonCodec implements HttpMessageCodec {
return new Object[] {JsonUtils.convertObject(obj,
targetTypes[0])};
}
throw new DecodeException("Json must be array");
- } catch (DecodeException e) {
+ } catch (HttpStatusException e) {
throw e;
} catch (Throwable t) {
throw new DecodeException("Error decoding json", t);
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
index 86131abb99..a3b2dbdea4 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.MethodUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import java.io.IOException;
import java.io.InputStream;
@@ -54,6 +55,8 @@ public final class JsonPbCodec extends JsonCodec {
JsonFormat.parser().ignoringUnknownFields().merge(StreamUtils.toString(is,
charset), newBuilder);
return newBuilder.build();
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable e) {
throw new DecodeException("Error decoding jsonPb", e);
}
@@ -67,6 +70,8 @@ public final class JsonPbCodec extends JsonCodec {
// protobuf only support one parameter
return new Object[] {decode(is, targetTypes[0], charset)};
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable e) {
throw new DecodeException("Error decoding jsonPb", e);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
index 33d067686e..48e41c657d 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -51,6 +52,8 @@ public final class PlainTextCodec implements HttpMessageCodec
{
if (targetType == String.class) {
return StreamUtils.toString(is, charset);
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new DecodeException(e);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
index bd51a2c91e..643bc6211e 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.convert.ConverterUtil;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -63,6 +64,8 @@ public class UrlEncodeFormCodec implements HttpMessageCodec {
} else {
throw new EncodeException("UrlEncodeFrom media-type only
supports String or Map as return type.");
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new EncodeException(e);
}
@@ -99,6 +102,8 @@ public class UrlEncodeFormCodec implements HttpMessageCodec {
} else {
return res.values().toArray();
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new DecodeException(e);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
index d63728083a..3627aea6e4 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -47,6 +48,8 @@ public class XmlCodec implements HttpMessageCodec {
try (OutputStreamWriter writer = new OutputStreamWriter(os,
charset)) {
marshaller.marshal(data, writer);
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new EncodeException("Error encoding xml", e);
}
@@ -63,6 +66,8 @@ public class XmlCodec implements HttpMessageCodec {
Unmarshaller unmarshaller = context.createUnmarshaller();
return unmarshaller.unmarshal(xmlSource);
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new DecodeException("Error decoding xml", e);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
index 30380eb159..9362bcaf8b 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.DefaultSerializeClassChecker;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -43,6 +44,8 @@ public class YamlCodec implements HttpMessageCodec {
public Object decode(InputStream is, Class<?> targetType, Charset charset)
throws DecodeException {
try (InputStreamReader reader = new InputStreamReader(is, charset)) {
return createYaml().loadAs(reader, (Class) targetType);
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable t) {
throw new DecodeException("Error decoding yaml", t);
}
@@ -69,6 +72,8 @@ public class YamlCodec implements HttpMessageCodec {
}
}
return results;
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable t) {
throw new DecodeException("Error decoding yaml", t);
}
@@ -78,6 +83,8 @@ public class YamlCodec implements HttpMessageCodec {
public void encode(OutputStream os, Object data, Charset charset) throws
EncodeException {
try (OutputStreamWriter writer = new OutputStreamWriter(os, charset)) {
createYaml().dump(data, writer);
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable t) {
throw new EncodeException("Error encoding yaml", t);
}
@@ -87,6 +94,8 @@ public class YamlCodec implements HttpMessageCodec {
public void encode(OutputStream os, Object[] data, Charset charset) throws
EncodeException {
try (OutputStreamWriter writer = new OutputStreamWriter(os, charset)) {
createYaml().dump(data, writer);
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Throwable t) {
throw new EncodeException("Error encoding yaml", t);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
index 443bd21aec..086dde1afd 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
@@ -16,24 +16,28 @@
*/
package org.apache.dubbo.remoting.http12.netty4.h1;
+import org.apache.dubbo.config.nested.TripleConfig;
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream;
import org.apache.dubbo.remoting.http12.h1.Http1OutputMessage;
import org.apache.dubbo.remoting.http12.netty4.NettyHttpChannelFutureListener;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
-import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
public class NettyHttp1Channel implements HttpChannel {
private final Channel channel;
- public NettyHttp1Channel(Channel channel) {
+ private final TripleConfig tripleConfig;
+
+ public NettyHttp1Channel(Channel channel, TripleConfig tripleConfig) {
this.channel = channel;
+ this.tripleConfig = tripleConfig;
}
@Override
@@ -52,7 +56,8 @@ public class NettyHttp1Channel implements HttpChannel {
@Override
public HttpOutputMessage newOutputMessage() {
- return new Http1OutputMessage(new
ByteBufOutputStream(channel.alloc().buffer()));
+ return new Http1OutputMessage(
+ new LimitedByteBufOutputStream(channel.alloc().buffer(),
tripleConfig.getMaxResponseBodySize()));
}
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
index 7380024f7c..452faba1b2 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.http12.netty4.h1;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.nested.TripleConfig;
import org.apache.dubbo.remoting.http12.h1.Http1Request;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListenerFactory;
@@ -33,12 +34,16 @@ public class NettyHttp1ConnectionHandler extends
SimpleChannelInboundHandler<Htt
private final Http1ServerTransportListenerFactory
http1ServerTransportListenerFactory;
+ private final TripleConfig tripleConfig;
+
public NettyHttp1ConnectionHandler(
URL url,
FrameworkModel frameworkModel,
+ TripleConfig tripleConfig,
Http1ServerTransportListenerFactory
http1ServerTransportListenerFactory) {
this.url = url;
this.frameworkModel = frameworkModel;
+ this.tripleConfig = tripleConfig;
this.http1ServerTransportListenerFactory =
http1ServerTransportListenerFactory;
}
@@ -47,8 +52,9 @@ public class NettyHttp1ConnectionHandler extends
SimpleChannelInboundHandler<Htt
*/
protected void channelRead0(ChannelHandlerContext ctx, Http1Request
http1Request) {
Http1ServerTransportListener http1TransportListener =
http1ServerTransportListenerFactory.newInstance(
- new NettyHttp1Channel(ctx.channel()), url, frameworkModel);
+ new NettyHttp1Channel(ctx.channel(), tripleConfig), url,
frameworkModel);
http1TransportListener.onMetadata(http1Request);
http1TransportListener.onData(http1Request);
+ ctx.channel().closeFuture().addListener(future ->
http1TransportListener.close());
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
index f85b127634..edebd985cc 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
@@ -16,8 +16,10 @@
*/
package org.apache.dubbo.remoting.http12.netty4.h2;
+import org.apache.dubbo.config.nested.TripleConfig;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2OutputMessage;
import org.apache.dubbo.remoting.http12.h2.Http2OutputMessageFrame;
@@ -35,8 +37,11 @@ public class NettyH2StreamChannel implements H2StreamChannel
{
private final Http2StreamChannel http2StreamChannel;
- public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel) {
+ private final TripleConfig tripleConfig;
+
+ public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel,
TripleConfig tripleConfig) {
this.http2StreamChannel = http2StreamChannel;
+ this.tripleConfig = tripleConfig;
}
@Override
@@ -57,7 +62,8 @@ public class NettyH2StreamChannel implements H2StreamChannel {
@Override
public Http2OutputMessage newOutputMessage(boolean endStream) {
ByteBuf buffer = http2StreamChannel.alloc().buffer();
- ByteBufOutputStream outputStream = new ByteBufOutputStream(buffer);
+ ByteBufOutputStream outputStream =
+ new LimitedByteBufOutputStream(buffer,
tripleConfig.getMaxResponseBodySize());
return new Http2OutputMessageFrame(outputStream, endStream);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
index 9496c94a00..ccac52f5ff 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
@@ -91,7 +91,7 @@ public class NettyHttp2FrameCodec extends
ChannelDuplexHandler {
private Http2HeadersFrame encodeHttp2HeadersFrame(Http2Header http2Header)
{
HttpHeaders headers = http2Header.headers();
- DefaultHttp2Headers http2Headers = new DefaultHttp2Headers();
+ DefaultHttp2Headers http2Headers = new DefaultHttp2Headers(false);
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
String name = entry.getKey();
List<String> value = entry.getValue();
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
index 582b656272..26fda3e87e 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.http12.netty4.h2;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.nested.TripleConfig;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
@@ -42,14 +43,18 @@ public class NettyHttp2ProtocolSelectorHandler extends
SimpleChannelInboundHandl
private final FrameworkModel frameworkModel;
+ private final TripleConfig tripleConfig;
+
private final Http2ServerTransportListenerFactory
defaultHttp2ServerTransportListenerFactory;
public NettyHttp2ProtocolSelectorHandler(
URL url,
FrameworkModel frameworkModel,
+ TripleConfig tripleConfig,
Http2ServerTransportListenerFactory
defaultHttp2ServerTransportListenerFactory) {
this.url = url;
this.frameworkModel = frameworkModel;
+ this.tripleConfig = tripleConfig;
this.defaultHttp2ServerTransportListenerFactory =
defaultHttp2ServerTransportListenerFactory;
}
@@ -61,7 +66,7 @@ public class NettyHttp2ProtocolSelectorHandler extends
SimpleChannelInboundHandl
if (factory == null) {
throw new UnsupportedMediaTypeException(contentType);
}
- H2StreamChannel h2StreamChannel = new
NettyH2StreamChannel((Http2StreamChannel) ctx.channel());
+ H2StreamChannel h2StreamChannel = new
NettyH2StreamChannel((Http2StreamChannel) ctx.channel(), tripleConfig);
HttpWriteQueueHandler writeQueueHandler =
ctx.channel().parent().pipeline().get(HttpWriteQueueHandler.class);
if (writeQueueHandler != null) {
@@ -70,7 +75,7 @@ public class NettyHttp2ProtocolSelectorHandler extends
SimpleChannelInboundHandl
}
ChannelPipeline pipeline = ctx.pipeline();
Http2TransportListener http2TransportListener =
factory.newInstance(h2StreamChannel, url, frameworkModel);
- ctx.channel().closeFuture().addListener(future ->
http2TransportListener.onStreamClosed());
+ ctx.channel().closeFuture().addListener(future ->
http2TransportListener.close());
pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel,
http2TransportListener));
pipeline.remove(this);
ctx.fireChannelRead(metadata);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 2c42b3f7bd..1bd9545b46 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -45,8 +45,7 @@ public enum TripleHeaderEnum {
SERVICE_GROUP("tri-service-group"),
SERVICE_TIMEOUT("tri-service-timeout"),
TRI_HEADER_CONVERT("tri-header-convert"),
- TRI_EXCEPTION_CODE("tri-exception-code"),
- ;
+ TRI_EXCEPTION_CODE("tri-exception-code");
static final Map<String, TripleHeaderEnum> enumMap = new HashMap<>();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 415e8d7a31..cee75e3ec3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -97,6 +97,7 @@ public class TripleHttp2Protocol extends AbstractWireProtocol
implements ScopeMo
.maxFrameSize(tripleConfig.getMaxFrameSize())
.maxHeaderListSize(tripleConfig.getMaxHeaderListSize()))
.frameLogger(CLIENT_LOGGER)
+ .validateHeaders(false)
.build();
//
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
List<ChannelHandler> handlers = new ArrayList<>();
@@ -142,11 +143,11 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
protocol -> {
if
(AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME,
protocol)) {
return new Http2ServerUpgradeCodec(
- buildHttp2FrameCodec(url),
+ buildHttp2FrameCodec(tripleConfig),
new HttpWriteQueueHandler(),
new FlushConsolidationHandler(64, true),
new TripleServerConnectionHandler(),
- buildHttp2MultiplexHandler(url),
+ buildHttp2MultiplexHandler(url, tripleConfig),
new TripleTailHandler());
}
// Not upgrade request
@@ -159,24 +160,25 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
handlers.add(new ChannelHandlerPretender(new
HttpObjectAggregator(tripleConfig.getMaxBodySize())));
handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec()));
handlers.add(new ChannelHandlerPretender(new
NettyHttp1ConnectionHandler(
- url, frameworkModel,
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
+ url, frameworkModel, tripleConfig,
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
}
- private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) {
+ private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url,
TripleConfig tripleConfig) {
return new Http2MultiplexHandler(new
ChannelInitializer<Http2StreamChannel>() {
@Override
protected void initChannel(Http2StreamChannel ch) {
final ChannelPipeline p = ch.pipeline();
p.addLast(new NettyHttp2FrameCodec());
p.addLast(new NettyHttp2ProtocolSelectorHandler(
- url, frameworkModel,
GenericHttp2ServerTransportListenerFactory.INSTANCE));
+ url, frameworkModel, tripleConfig,
GenericHttp2ServerTransportListenerFactory.INSTANCE));
}
});
}
private void configurerHttp2Handlers(URL url, List<ChannelHandler>
handlers) {
- final Http2FrameCodec codec = buildHttp2FrameCodec(url);
- final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url);
+ TripleConfig tripleConfig = getTripleConfig(url);
+ final Http2FrameCodec codec = buildHttp2FrameCodec(tripleConfig);
+ final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url,
tripleConfig);
handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
handlers.add(new ChannelHandlerPretender(codec));
handlers.add(new ChannelHandlerPretender(new
FlushConsolidationHandler(64, true)));
@@ -185,8 +187,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
}
- private Http2FrameCodec buildHttp2FrameCodec(URL url) {
- TripleConfig tripleConfig = getTripleConfig(url);
+ private Http2FrameCodec buildHttp2FrameCodec(TripleConfig tripleConfig) {
return TripleHttp2FrameCodecBuilder.forServer()
.customizeConnection((connection) ->
connection.remote().flowController(new
TriHttp2RemoteFlowController(connection, tripleConfig)))
@@ -198,6 +199,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
.maxFrameSize(tripleConfig.getMaxFrameSize())
.maxHeaderListSize(tripleConfig.getMaxHeaderListSize()))
.frameLogger(SERVER_LOGGER)
+ .validateHeaders(false)
.build();
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 9ae7881302..1b25dc57a1 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -26,6 +26,7 @@ import
org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
+import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
@@ -75,11 +76,20 @@ public class GrpcHttp2ServerTransportListener extends
GenericHttp2ServerTranspor
return new GrpcStreamingDecoder();
}
+ @Override
+ protected Http2ServerChannelObserver newHttp2ServerChannelObserver(
+ FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) {
+ return new GrpcServerChannelObserver(frameworkModel, h2StreamChannel);
+ }
+
@Override
protected HttpMessageListener buildHttpMessageListener() {
return getContext().isHasStub() ? super.buildHttpMessageListener() :
new LazyFindMethodListener();
}
+ @Override
+ protected void onUnary() {}
+
@Override
protected void onMetadataCompletion(Http2Header metadata) {
super.onMetadataCompletion(metadata);
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java
similarity index 59%
copy from
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
copy to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java
index 4cb0342642..5b044c400f 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java
@@ -14,11 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.http12;
+package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
-import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import
org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2ServerCallToObserverAdapter;
-public interface HttpChannelObserver<T> extends StreamObserver<T> {
+public class GrpcServerChannelObserver extends
Http2ServerCallToObserverAdapter {
- HttpChannel getHttpChannel();
+ public GrpcServerChannelObserver(FrameworkModel frameworkModel,
H2StreamChannel h2StreamChannel) {
+ super(frameworkModel, h2StreamChannel);
+ }
+
+ @Override
+ protected void doOnError(Throwable throwable) {}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index 950a2d9350..3f9406a3cc 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -108,6 +108,11 @@ public class DefaultHttp11ServerTransportListener
serverChannelObserver.onError(throwable);
}
+ @Override
+ public void close() throws Exception {
+ serverChannelObserver.close();
+ }
+
private static class AutoCompleteUnaryServerCallListener extends
UnaryServerCallListener {
public AutoCompleteUnaryServerCallListener(
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index a44fefcd47..9c46944ccf 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -61,8 +61,9 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
private final ExecutorSupport executorSupport;
private final StreamingDecoder streamingDecoder;
- private final Http2ServerChannelObserver serverChannelObserver;
-
+ private final FrameworkModel frameworkModel;
+ private final H2StreamChannel h2StreamChannel;
+ private Http2ServerChannelObserver serverChannelObserver;
private ServerCallListener serverCallListener;
public GenericHttp2ServerTransportListener(
@@ -71,15 +72,22 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
executorSupport =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel())
.getExecutorSupport(url);
streamingDecoder = newStreamingDecoder();
- serverChannelObserver = new
Http2ServerCallToObserverAdapter(frameworkModel, h2StreamChannel);
+ serverChannelObserver = newHttp2ServerChannelObserver(frameworkModel,
h2StreamChannel);
serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE);
serverChannelObserver.setStreamingDecoder(streamingDecoder);
+ this.frameworkModel = frameworkModel;
+ this.h2StreamChannel = h2StreamChannel;
}
protected StreamingDecoder newStreamingDecoder() {
return new DefaultStreamingDecoder();
}
+ protected Http2ServerChannelObserver newHttp2ServerChannelObserver(
+ FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) {
+ return new Http2ServerCallToObserverAdapter(frameworkModel,
h2StreamChannel);
+ }
+
@Override
protected Executor initializeExecutor(Http2Header metadata) {
return new SerializingExecutor(executorSupport.getExecutor(metadata));
@@ -114,11 +122,9 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
private ServerCallListener startListener(
RpcInvocation invocation, MethodDescriptor methodDescriptor,
Invoker<?> invoker) {
- Http2ServerChannelObserver responseObserver =
getServerChannelObserver();
- CancellationContext cancellationContext =
RpcContext.getCancellationContext();
- responseObserver.setCancellationContext(cancellationContext);
switch (methodDescriptor.getRpcType()) {
case UNARY:
+ onUnary();
boolean applyCustomizeException = false;
if (!getContext().isHasStub()) {
MethodMetadata methodMetadata =
getContext().getMethodMetadata();
@@ -127,19 +133,34 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
methodMetadata.getActualRequestTypes(),
methodMetadata.getActualResponseType());
}
- UnaryServerCallListener unaryServerCallListener =
startUnary(invocation, invoker, responseObserver);
+ onListenerStart();
+ UnaryServerCallListener unaryServerCallListener =
+ startUnary(invocation, invoker,
getServerChannelObserver());
unaryServerCallListener.setApplyCustomizeException(applyCustomizeException);
return unaryServerCallListener;
case SERVER_STREAM:
- return startServerStreaming(invocation, invoker,
responseObserver);
+ onListenerStart();
+ return startServerStreaming(invocation, invoker,
getServerChannelObserver());
case BI_STREAM:
case CLIENT_STREAM:
- return startBiStreaming(invocation, invoker, responseObserver);
+ onListenerStart();
+ return startBiStreaming(invocation, invoker,
getServerChannelObserver());
default:
throw new IllegalStateException("Can not reach here");
}
}
+ protected void onUnary() {
+ serverChannelObserver = new
Http2ServerUnaryChannelObserver(frameworkModel, h2StreamChannel);
+ serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE);
+ serverChannelObserver.setStreamingDecoder(streamingDecoder);
+ }
+
+ protected void onListenerStart() {
+ CancellationContext cancellationContext =
RpcContext.getCancellationContext();
+ serverChannelObserver.setCancellationContext(cancellationContext);
+ }
+
private UnaryServerCallListener startUnary(
RpcInvocation invocation, Invoker<?> invoker,
Http2ServerChannelObserver responseObserver) {
return new UnaryServerCallListener(invocation, invoker,
responseObserver);
@@ -201,9 +222,8 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
}
@Override
- public void onStreamClosed() {
- // doing on event loop thread
- getServerChannelObserver().onStreamClosed();
+ public void close() throws Exception {
+ getServerChannelObserver().close();
}
private static class Http2StreamingDecodeListener implements
ListeningDecoder.Listener {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java
new file mode 100644
index 0000000000..10d2c2a530
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h12.http2;
+
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+public class Http2ServerUnaryChannelObserver extends
Http2ServerCallToObserverAdapter {
+
+ public Http2ServerUnaryChannelObserver(FrameworkModel frameworkModel,
H2StreamChannel h2StreamChannel) {
+ super(frameworkModel, h2StreamChannel);
+ }
+
+ @Override
+ public void doOnNext(Object data) throws Throwable {
+ HttpOutputMessage httpOutputMessage = buildMessage(data);
+ sendHeader(buildMetadata(resolveStatusCode(data), data,
httpOutputMessage));
+ sendMessage(httpOutputMessage);
+ }
+
+ @Override
+ public void doOnError(Throwable throwable) throws Throwable {
+ String statusCode = resolveStatusCode(throwable);
+ Object data = buildErrorResponse(statusCode, throwable);
+ HttpOutputMessage httpOutputMessage = buildMessage(data);
+ sendHeader(buildMetadata(statusCode, data, httpOutputMessage));
+ sendMessage(httpOutputMessage);
+ }
+
+ @Override
+ protected void doOnCompleted(Throwable throwable) {}
+
+ @Override
+ protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
+ return getHttpChannel().newOutputMessage(true);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
index 23462b7b6b..448a2b7db4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.remoting.http12.HttpRequest;
import org.apache.dubbo.remoting.http12.HttpResponse;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageDecoder;
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -108,6 +109,8 @@ public final class RestHttpMessageCodec implements
HttpMessageDecoder, HttpMessa
if (messageEncoder.mediaType().isPureText() && type !=
String.class) {
data = typeConverter.convert(data, String.class);
}
+ } catch (HttpStatusException e) {
+ throw e;
} catch (Exception e) {
throw new EncodeException(e);
}