This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new c23af2487d Http2 sse support (#14673)
c23af2487d is described below
commit c23af2487d5cdead0b543bda6bbd11e7ce2933b4
Author: Sean Yang <[email protected]>
AuthorDate: Sun Sep 29 10:07:11 2024 +0800
Http2 sse support (#14673)
---
.../dubbo/remoting/http12/HttpConstants.java | 7 ++++++
.../dubbo/remoting/http12/HttpHeaderNames.java | 2 ++
.../apache/dubbo/remoting/http12/HttpMetadata.java | 5 +++++
...ver.java => Http1SseServerChannelObserver.java} | 19 ++++++----------
.../remoting/http12/message/codec/JsonPbCodec.java | 3 ++-
.../h2/NettyHttp2ProtocolSelectorHandler.java | 5 +----
.../main/java/org/apache/dubbo/rpc/Constants.java | 1 -
.../DefaultHttp11ServerTransportListener.java | 8 +++----
.../h12/http1/Http1UnaryServerChannelObserver.java | 2 +-
.../http2/GenericHttp2ServerTransportListener.java | 9 ++++++--
.../h12/http2/Http2SseServerChannelObserver.java | 25 +++++++++-------------
.../dubbo/rpc/protocol/tri/test/TestResponse.java | 9 ++++++--
.../rpc/protocol/tri/test/TestRunnerImpl.java | 2 +-
13 files changed, 54 insertions(+), 43 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
index a45e8fe396..ef6995b857 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
@@ -16,12 +16,16 @@
*/
package org.apache.dubbo.remoting.http12;
+import java.nio.charset.StandardCharsets;
+
public final class HttpConstants {
public static final String TRAILERS = "trailers";
public static final String CHUNKED = "chunked";
+ public static final String NO_CACHE = "no-cache";
+
public static final String X_FORWARDED_PROTO = "x-forwarded-proto";
public static final String X_FORWARDED_HOST = "x-forwarded-host";
public static final String X_FORWARDED_PORT = "x-forwarded-port";
@@ -29,5 +33,8 @@ public final class HttpConstants {
public static final String HTTPS = "https";
public static final String HTTP = "http";
+ public static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES =
"data:".getBytes(StandardCharsets.US_ASCII);
+ public static final byte[] SERVER_SENT_EVENT_LF_BYTES =
"\n\n".getBytes(StandardCharsets.US_ASCII);
+
private HttpConstants() {}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
index 0abfb8658f..5272913d3e 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
@@ -34,6 +34,8 @@ public enum HttpHeaderNames {
TRANSFER_ENCODING(io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING),
+ CACHE_CONTROL(io.netty.handler.codec.http.HttpHeaderNames.CACHE_CONTROL),
+
LOCATION(io.netty.handler.codec.http.HttpHeaderNames.LOCATION),
HOST(io.netty.handler.codec.http.HttpHeaderNames.HOST),
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
index acf8a5d171..48cdf45633 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
@@ -27,4 +27,9 @@ public interface HttpMetadata {
default String header(CharSequence name) {
return headers().getFirst(name);
}
+
+ default HttpMetadata header(CharSequence name, String value) {
+ headers().set(name, value);
+ return this;
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
similarity index 68%
copy from
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
copy to
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
index b401075e89..486af06e2a 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
@@ -19,40 +19,35 @@ package org.apache.dubbo.remoting.http12.h1;
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
-import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-public class Http1ServerStreamChannelObserver extends
Http1ServerChannelObserver {
+public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {
- private static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES =
"data:".getBytes(StandardCharsets.US_ASCII);
- private static final byte[] SERVER_SENT_EVENT_LF_BYTES =
"\n\n".getBytes(StandardCharsets.US_ASCII);
-
- public Http1ServerStreamChannelObserver(HttpChannel httpChannel) {
+ public Http1SseServerChannelObserver(HttpChannel httpChannel) {
super(httpChannel);
}
@Override
protected HttpMetadata encodeHttpMetadata(boolean endStream) {
- HttpHeaders headers = HttpHeaders.create();
- headers.set(HttpHeaderNames.TRANSFER_ENCODING.getKey(),
HttpConstants.CHUNKED);
- return new Http1Metadata(headers);
+ return super.encodeHttpMetadata(endStream)
+ .header(HttpHeaderNames.TRANSFER_ENCODING.getKey(),
HttpConstants.CHUNKED)
+ .header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
}
@Override
protected void preOutputMessage(HttpOutputMessage message) throws
IOException {
HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
- prefixMessage.getBody().write(SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
+
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
getHttpChannel().writeMessage(prefixMessage);
}
@Override
protected void postOutputMessage(HttpOutputMessage message) throws
IOException {
HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
- lfMessage.getBody().write(SERVER_SENT_EVENT_LF_BYTES);
+ lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
getHttpChannel().writeMessage(lfMessage);
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
index 725dc985e4..502f264e2f 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
@@ -47,7 +47,8 @@ public final class JsonPbCodec extends JsonCodec {
public void encode(OutputStream os, Object data, Charset charset) throws
EncodeException {
try {
if (data instanceof Message) {
- String jsonString = JsonFormat.printer().print((Message) data);
+ String jsonString =
+
JsonFormat.printer().omittingInsignificantWhitespace().print((Message) data);
os.write(jsonString.getBytes(charset));
return;
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
index 9dc05b48d3..c23e176531 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
@@ -21,8 +21,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.config.nested.TripleConfig;
-import org.apache.dubbo.remoting.http12.HttpHeaderNames;
-import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.command.HttpWriteQueue;
import
org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException;
@@ -64,8 +62,7 @@ public class NettyHttp2ProtocolSelectorHandler extends
SimpleChannelInboundHandl
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata
metadata) {
- HttpHeaders headers = metadata.headers();
- String contentType =
headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
+ String contentType = metadata.contentType();
Http2ServerTransportListenerFactory factory =
UrlUtils.computeServiceAttribute(
url,
TRANSPORT_LISTENER_FACTORY_CACHE,
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index 6fee317668..d7512eeb88 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -115,7 +115,6 @@ public interface Constants {
String H2_SETTINGS_SERVLET_ENABLED =
"dubbo.protocol.triple.servlet.enabled";
String H3_SETTINGS_HTTP3_ENABLED = "dubbo.protocol.triple.http3.enabled";
String H3_SETTINGS_HTTP3_NEGOTIATION =
"dubbo.protocol.triple.http3.negotiation";
- String H2_SETTINGS_CONNECTION_INITIAL_WINDOW_SIZE_KEY =
"dubbo.rpc.tri.connection-initial-window-size";
String ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY = "lb_adaptive";
String ADAPTIVE_LOADBALANCE_START_TIME = "adaptive_startTime";
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index e59cb07421..5e03be4c40 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -24,8 +24,8 @@ import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.RequestMetadata;
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
-import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
+import org.apache.dubbo.remoting.http12.h1.Http1SseServerChannelObserver;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
@@ -81,9 +81,9 @@ public class DefaultHttp11ServerTransportListener
case UNARY:
return new AutoCompleteUnaryServerCallListener(invocation,
invoker, responseObserver);
case SERVER_STREAM:
- responseObserver = prepareResponseObserver(new
Http1ServerStreamChannelObserver(httpChannel));
+ responseObserver = prepareResponseObserver(new
Http1SseServerChannelObserver(httpChannel));
responseObserver.addHeadersCustomizer((hs, t) ->
- hs.set(HttpHeaderNames.CONTENT_TYPE.getName(),
MediaType.TEXT_EVENT_STREAM.getName()));
+ hs.set(HttpHeaderNames.CONTENT_TYPE.getKey(),
MediaType.TEXT_EVENT_STREAM.getName()));
return new
AutoCompleteServerStreamServerCallListener(invocation, invoker,
responseObserver);
default:
throw new UnsupportedOperationException("HTTP1.x only support
unary and server-stream");
@@ -104,7 +104,7 @@ public class DefaultHttp11ServerTransportListener
protected void initializeAltSvc(URL url) {
String protocolId = Http3Exchanger.isEnabled(url) ? "h3" : "h2";
String value = protocolId + "=\":" +
url.getParameter(Constants.BIND_PORT_KEY, url.getPort()) + '"';
- responseObserver.addHeadersCustomizer((hs, t) ->
hs.set(HttpHeaderNames.ALT_SVC.getName(), value));
+ responseObserver.addHeadersCustomizer((hs, t) ->
hs.set(HttpHeaderNames.ALT_SVC.getKey(), value));
}
private static final class AutoCompleteUnaryServerCallListener extends
UnaryServerCallListener {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
index 5b543957a2..1aa994de52 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
@@ -66,7 +66,7 @@ public final class Http1UnaryServerChannelObserver extends
Http1ServerChannelObs
throw new IllegalArgumentException("Unsupported body type: " +
body.getClass());
}
}
- headers.set(HttpHeaderNames.CONTENT_LENGTH.getName(),
String.valueOf(contentLength));
+ headers.set(HttpHeaderNames.CONTENT_LENGTH.getKey(),
String.valueOf(contentLength));
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 9f395c5507..f4f0aeefb3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -28,6 +28,7 @@ import
org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.DefaultStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.ListeningDecoder;
+import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
import org.apache.dubbo.rpc.Invoker;
@@ -71,7 +72,11 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
}
protected Http2ServerChannelObserver
newStreamResponseObserver(H2StreamChannel h2StreamChannel) {
- return new Http2StreamServerChannelObserver(getFrameworkModel(),
h2StreamChannel);
+ Http2ServerChannelObserver responseObserver =
+ new Http2SseServerChannelObserver(getFrameworkModel(),
h2StreamChannel);
+ responseObserver.addHeadersCustomizer(
+ (hs, t) -> hs.set(HttpHeaderNames.CONTENT_TYPE.getKey(),
MediaType.TEXT_EVENT_STREAM.getName()));
+ return responseObserver;
}
protected Http2ServerChannelObserver
prepareResponseObserver(Http2ServerChannelObserver responseObserver) {
@@ -124,7 +129,7 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
protected void initializeAltSvc(URL url) {
if (Http3Exchanger.isEnabled(url)) {
String value = "h3=\":" +
url.getParameter(Constants.BIND_PORT_KEY, url.getPort()) + '"';
- responseObserver.addHeadersCustomizer((hs, t) ->
hs.set(HttpHeaderNames.ALT_SVC.getName(), value));
+ responseObserver.addHeadersCustomizer((hs, t) ->
hs.set(HttpHeaderNames.ALT_SVC.getKey(), value));
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
similarity index 63%
rename from
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
rename to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index b401075e89..b36dc5e353 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -14,45 +14,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.http12.h1;
+package org.apache.dubbo.rpc.protocol.tri.h12.http2;
-import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
-import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-public class Http1ServerStreamChannelObserver extends
Http1ServerChannelObserver {
+public final class Http2SseServerChannelObserver extends
Http2StreamServerChannelObserver {
- private static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES =
"data:".getBytes(StandardCharsets.US_ASCII);
- private static final byte[] SERVER_SENT_EVENT_LF_BYTES =
"\n\n".getBytes(StandardCharsets.US_ASCII);
-
- public Http1ServerStreamChannelObserver(HttpChannel httpChannel) {
- super(httpChannel);
+ public Http2SseServerChannelObserver(FrameworkModel frameworkModel,
H2StreamChannel h2StreamChannel) {
+ super(frameworkModel, h2StreamChannel);
}
@Override
protected HttpMetadata encodeHttpMetadata(boolean endStream) {
- HttpHeaders headers = HttpHeaders.create();
- headers.set(HttpHeaderNames.TRANSFER_ENCODING.getKey(),
HttpConstants.CHUNKED);
- return new Http1Metadata(headers);
+ return super.encodeHttpMetadata(endStream)
+ .header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
}
@Override
protected void preOutputMessage(HttpOutputMessage message) throws
IOException {
HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
- prefixMessage.getBody().write(SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
+
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
getHttpChannel().writeMessage(prefixMessage);
}
@Override
protected void postOutputMessage(HttpOutputMessage message) throws
IOException {
HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
- lfMessage.getBody().write(SERVER_SENT_EVENT_LF_BYTES);
+ lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
getHttpChannel().writeMessage(lfMessage);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
index 3d42580293..44470d9e41 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageDecoder;
+import org.apache.dubbo.remoting.http12.message.MediaType;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -81,8 +82,12 @@ public class TestResponse {
List<T> bodies = (List<T>) this.bodies;
if (bodies == null) {
bodies = new ArrayList<>(oss.size());
- for (OutputStream os : oss) {
- ByteArrayOutputStream bos = (ByteArrayOutputStream) os;
+ boolean isTextEvent =
MediaType.TEXT_EVENT_STREAM.getName().equals(getContentType());
+ for (int i = 0, size = oss.size(); i < size; i++) {
+ if (isTextEvent && i % 3 != 1) {
+ continue;
+ }
+ ByteArrayOutputStream bos = (ByteArrayOutputStream) oss.get(i);
if (bos.size() == 0) {
bodies.add(null);
} else {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
index c5df5c2db9..439b96fb32 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
@@ -92,7 +92,7 @@ final class TestRunnerImpl implements TestRunner {
}
@Override
- @SuppressWarnings({"unchecked", "resource"})
+ @SuppressWarnings("unchecked")
public TestResponse run(TestRequest request) {
MockH2StreamChannel channel = new MockH2StreamChannel();
URL url = new URL(TestProtocol.NAME, TestProtocol.HOST,
TestProtocol.PORT, request.getProviderParams());