This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new c23af2487d Http2 sse support (#14673)
c23af2487d is described below

commit c23af2487d5cdead0b543bda6bbd11e7ce2933b4
Author: Sean Yang <[email protected]>
AuthorDate: Sun Sep 29 10:07:11 2024 +0800

    Http2 sse support (#14673)
---
 .../dubbo/remoting/http12/HttpConstants.java       |  7 ++++++
 .../dubbo/remoting/http12/HttpHeaderNames.java     |  2 ++
 .../apache/dubbo/remoting/http12/HttpMetadata.java |  5 +++++
 ...ver.java => Http1SseServerChannelObserver.java} | 19 ++++++----------
 .../remoting/http12/message/codec/JsonPbCodec.java |  3 ++-
 .../h2/NettyHttp2ProtocolSelectorHandler.java      |  5 +----
 .../main/java/org/apache/dubbo/rpc/Constants.java  |  1 -
 .../DefaultHttp11ServerTransportListener.java      |  8 +++----
 .../h12/http1/Http1UnaryServerChannelObserver.java |  2 +-
 .../http2/GenericHttp2ServerTransportListener.java |  9 ++++++--
 .../h12/http2/Http2SseServerChannelObserver.java   | 25 +++++++++-------------
 .../dubbo/rpc/protocol/tri/test/TestResponse.java  |  9 ++++++--
 .../rpc/protocol/tri/test/TestRunnerImpl.java      |  2 +-
 13 files changed, 54 insertions(+), 43 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
index a45e8fe396..ef6995b857 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
@@ -16,12 +16,16 @@
  */
 package org.apache.dubbo.remoting.http12;
 
+import java.nio.charset.StandardCharsets;
+
 public final class HttpConstants {
 
     public static final String TRAILERS = "trailers";
 
     public static final String CHUNKED = "chunked";
 
+    public static final String NO_CACHE = "no-cache";
+
     public static final String X_FORWARDED_PROTO = "x-forwarded-proto";
     public static final String X_FORWARDED_HOST = "x-forwarded-host";
     public static final String X_FORWARDED_PORT = "x-forwarded-port";
@@ -29,5 +33,8 @@ public final class HttpConstants {
     public static final String HTTPS = "https";
     public static final String HTTP = "http";
 
+    public static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES = 
"data:".getBytes(StandardCharsets.US_ASCII);
+    public static final byte[] SERVER_SENT_EVENT_LF_BYTES = 
"\n\n".getBytes(StandardCharsets.US_ASCII);
+
     private HttpConstants() {}
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
index 0abfb8658f..5272913d3e 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java
@@ -34,6 +34,8 @@ public enum HttpHeaderNames {
 
     
TRANSFER_ENCODING(io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING),
 
+    CACHE_CONTROL(io.netty.handler.codec.http.HttpHeaderNames.CACHE_CONTROL),
+
     LOCATION(io.netty.handler.codec.http.HttpHeaderNames.LOCATION),
 
     HOST(io.netty.handler.codec.http.HttpHeaderNames.HOST),
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
index acf8a5d171..48cdf45633 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpMetadata.java
@@ -27,4 +27,9 @@ public interface HttpMetadata {
     default String header(CharSequence name) {
         return headers().getFirst(name);
     }
+
+    default HttpMetadata header(CharSequence name, String value) {
+        headers().set(name, value);
+        return this;
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
similarity index 68%
copy from 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
copy to 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
index b401075e89..486af06e2a 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
@@ -19,40 +19,35 @@ package org.apache.dubbo.remoting.http12.h1;
 import org.apache.dubbo.remoting.http12.HttpChannel;
 import org.apache.dubbo.remoting.http12.HttpConstants;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
-import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.HttpOutputMessage;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
-public class Http1ServerStreamChannelObserver extends 
Http1ServerChannelObserver {
+public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {
 
-    private static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES = 
"data:".getBytes(StandardCharsets.US_ASCII);
-    private static final byte[] SERVER_SENT_EVENT_LF_BYTES = 
"\n\n".getBytes(StandardCharsets.US_ASCII);
-
-    public Http1ServerStreamChannelObserver(HttpChannel httpChannel) {
+    public Http1SseServerChannelObserver(HttpChannel httpChannel) {
         super(httpChannel);
     }
 
     @Override
     protected HttpMetadata encodeHttpMetadata(boolean endStream) {
-        HttpHeaders headers = HttpHeaders.create();
-        headers.set(HttpHeaderNames.TRANSFER_ENCODING.getKey(), 
HttpConstants.CHUNKED);
-        return new Http1Metadata(headers);
+        return super.encodeHttpMetadata(endStream)
+                .header(HttpHeaderNames.TRANSFER_ENCODING.getKey(), 
HttpConstants.CHUNKED)
+                .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
     }
 
     @Override
     protected void preOutputMessage(HttpOutputMessage message) throws 
IOException {
         HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
-        prefixMessage.getBody().write(SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
+        
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
         getHttpChannel().writeMessage(prefixMessage);
     }
 
     @Override
     protected void postOutputMessage(HttpOutputMessage message) throws 
IOException {
         HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
-        lfMessage.getBody().write(SERVER_SENT_EVENT_LF_BYTES);
+        lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
         getHttpChannel().writeMessage(lfMessage);
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
index 725dc985e4..502f264e2f 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
@@ -47,7 +47,8 @@ public final class JsonPbCodec extends JsonCodec {
     public void encode(OutputStream os, Object data, Charset charset) throws 
EncodeException {
         try {
             if (data instanceof Message) {
-                String jsonString = JsonFormat.printer().print((Message) data);
+                String jsonString =
+                        
JsonFormat.printer().omittingInsignificantWhitespace().print((Message) data);
                 os.write(jsonString.getBytes(charset));
                 return;
             }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
index 9dc05b48d3..c23e176531 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
@@ -21,8 +21,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.common.utils.UrlUtils;
 import org.apache.dubbo.config.nested.TripleConfig;
-import org.apache.dubbo.remoting.http12.HttpHeaderNames;
-import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.command.HttpWriteQueue;
 import 
org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException;
@@ -64,8 +62,7 @@ public class NettyHttp2ProtocolSelectorHandler extends 
SimpleChannelInboundHandl
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata 
metadata) {
-        HttpHeaders headers = metadata.headers();
-        String contentType = 
headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
+        String contentType = metadata.contentType();
         Http2ServerTransportListenerFactory factory = 
UrlUtils.computeServiceAttribute(
                         url,
                         TRANSPORT_LISTENER_FACTORY_CACHE,
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index 6fee317668..d7512eeb88 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -115,7 +115,6 @@ public interface Constants {
     String H2_SETTINGS_SERVLET_ENABLED = 
"dubbo.protocol.triple.servlet.enabled";
     String H3_SETTINGS_HTTP3_ENABLED = "dubbo.protocol.triple.http3.enabled";
     String H3_SETTINGS_HTTP3_NEGOTIATION = 
"dubbo.protocol.triple.http3.negotiation";
-    String H2_SETTINGS_CONNECTION_INITIAL_WINDOW_SIZE_KEY = 
"dubbo.rpc.tri.connection-initial-window-size";
 
     String ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY = "lb_adaptive";
     String ADAPTIVE_LOADBALANCE_START_TIME = "adaptive_startTime";
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index e59cb07421..5e03be4c40 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -24,8 +24,8 @@ import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpInputMessage;
 import org.apache.dubbo.remoting.http12.RequestMetadata;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
-import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
+import org.apache.dubbo.remoting.http12.h1.Http1SseServerChannelObserver;
 import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
@@ -81,9 +81,9 @@ public class DefaultHttp11ServerTransportListener
             case UNARY:
                 return new AutoCompleteUnaryServerCallListener(invocation, 
invoker, responseObserver);
             case SERVER_STREAM:
-                responseObserver = prepareResponseObserver(new 
Http1ServerStreamChannelObserver(httpChannel));
+                responseObserver = prepareResponseObserver(new 
Http1SseServerChannelObserver(httpChannel));
                 responseObserver.addHeadersCustomizer((hs, t) ->
-                        hs.set(HttpHeaderNames.CONTENT_TYPE.getName(), 
MediaType.TEXT_EVENT_STREAM.getName()));
+                        hs.set(HttpHeaderNames.CONTENT_TYPE.getKey(), 
MediaType.TEXT_EVENT_STREAM.getName()));
                 return new 
AutoCompleteServerStreamServerCallListener(invocation, invoker, 
responseObserver);
             default:
                 throw new UnsupportedOperationException("HTTP1.x only support 
unary and server-stream");
@@ -104,7 +104,7 @@ public class DefaultHttp11ServerTransportListener
     protected void initializeAltSvc(URL url) {
         String protocolId = Http3Exchanger.isEnabled(url) ? "h3" : "h2";
         String value = protocolId + "=\":" + 
url.getParameter(Constants.BIND_PORT_KEY, url.getPort()) + '"';
-        responseObserver.addHeadersCustomizer((hs, t) -> 
hs.set(HttpHeaderNames.ALT_SVC.getName(), value));
+        responseObserver.addHeadersCustomizer((hs, t) -> 
hs.set(HttpHeaderNames.ALT_SVC.getKey(), value));
     }
 
     private static final class AutoCompleteUnaryServerCallListener extends 
UnaryServerCallListener {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
index 5b543957a2..1aa994de52 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1UnaryServerChannelObserver.java
@@ -66,7 +66,7 @@ public final class Http1UnaryServerChannelObserver extends 
Http1ServerChannelObs
                 throw new IllegalArgumentException("Unsupported body type: " + 
body.getClass());
             }
         }
-        headers.set(HttpHeaderNames.CONTENT_LENGTH.getName(), 
String.valueOf(contentLength));
+        headers.set(HttpHeaderNames.CONTENT_LENGTH.getKey(), 
String.valueOf(contentLength));
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 9f395c5507..f4f0aeefb3 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -28,6 +28,7 @@ import 
org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
 import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
 import org.apache.dubbo.remoting.http12.message.DefaultStreamingDecoder;
 import org.apache.dubbo.remoting.http12.message.ListeningDecoder;
+import org.apache.dubbo.remoting.http12.message.MediaType;
 import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
 import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
 import org.apache.dubbo.rpc.Invoker;
@@ -71,7 +72,11 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
     }
 
     protected Http2ServerChannelObserver 
newStreamResponseObserver(H2StreamChannel h2StreamChannel) {
-        return new Http2StreamServerChannelObserver(getFrameworkModel(), 
h2StreamChannel);
+        Http2ServerChannelObserver responseObserver =
+                new Http2SseServerChannelObserver(getFrameworkModel(), 
h2StreamChannel);
+        responseObserver.addHeadersCustomizer(
+                (hs, t) -> hs.set(HttpHeaderNames.CONTENT_TYPE.getKey(), 
MediaType.TEXT_EVENT_STREAM.getName()));
+        return responseObserver;
     }
 
     protected Http2ServerChannelObserver 
prepareResponseObserver(Http2ServerChannelObserver responseObserver) {
@@ -124,7 +129,7 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
     protected void initializeAltSvc(URL url) {
         if (Http3Exchanger.isEnabled(url)) {
             String value = "h3=\":" + 
url.getParameter(Constants.BIND_PORT_KEY, url.getPort()) + '"';
-            responseObserver.addHeadersCustomizer((hs, t) -> 
hs.set(HttpHeaderNames.ALT_SVC.getName(), value));
+            responseObserver.addHeadersCustomizer((hs, t) -> 
hs.set(HttpHeaderNames.ALT_SVC.getKey(), value));
         }
     }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
similarity index 63%
rename from 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
rename to 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index b401075e89..b36dc5e353 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -14,45 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.remoting.http12.h1;
+package org.apache.dubbo.rpc.protocol.tri.h12.http2;
 
-import org.apache.dubbo.remoting.http12.HttpChannel;
 import org.apache.dubbo.remoting.http12.HttpConstants;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
-import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
-public class Http1ServerStreamChannelObserver extends 
Http1ServerChannelObserver {
+public final class Http2SseServerChannelObserver extends 
Http2StreamServerChannelObserver {
 
-    private static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES = 
"data:".getBytes(StandardCharsets.US_ASCII);
-    private static final byte[] SERVER_SENT_EVENT_LF_BYTES = 
"\n\n".getBytes(StandardCharsets.US_ASCII);
-
-    public Http1ServerStreamChannelObserver(HttpChannel httpChannel) {
-        super(httpChannel);
+    public Http2SseServerChannelObserver(FrameworkModel frameworkModel, 
H2StreamChannel h2StreamChannel) {
+        super(frameworkModel, h2StreamChannel);
     }
 
     @Override
     protected HttpMetadata encodeHttpMetadata(boolean endStream) {
-        HttpHeaders headers = HttpHeaders.create();
-        headers.set(HttpHeaderNames.TRANSFER_ENCODING.getKey(), 
HttpConstants.CHUNKED);
-        return new Http1Metadata(headers);
+        return super.encodeHttpMetadata(endStream)
+                .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
     }
 
     @Override
     protected void preOutputMessage(HttpOutputMessage message) throws 
IOException {
         HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
-        prefixMessage.getBody().write(SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
+        
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
         getHttpChannel().writeMessage(prefixMessage);
     }
 
     @Override
     protected void postOutputMessage(HttpOutputMessage message) throws 
IOException {
         HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
-        lfMessage.getBody().write(SERVER_SENT_EVENT_LF_BYTES);
+        lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
         getHttpChannel().writeMessage(lfMessage);
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
index 3d42580293..44470d9e41 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageDecoder;
+import org.apache.dubbo.remoting.http12.message.MediaType;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -81,8 +82,12 @@ public class TestResponse {
         List<T> bodies = (List<T>) this.bodies;
         if (bodies == null) {
             bodies = new ArrayList<>(oss.size());
-            for (OutputStream os : oss) {
-                ByteArrayOutputStream bos = (ByteArrayOutputStream) os;
+            boolean isTextEvent = 
MediaType.TEXT_EVENT_STREAM.getName().equals(getContentType());
+            for (int i = 0, size = oss.size(); i < size; i++) {
+                if (isTextEvent && i % 3 != 1) {
+                    continue;
+                }
+                ByteArrayOutputStream bos = (ByteArrayOutputStream) oss.get(i);
                 if (bos.size() == 0) {
                     bodies.add(null);
                 } else {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
index c5df5c2db9..439b96fb32 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestRunnerImpl.java
@@ -92,7 +92,7 @@ final class TestRunnerImpl implements TestRunner {
     }
 
     @Override
-    @SuppressWarnings({"unchecked", "resource"})
+    @SuppressWarnings("unchecked")
     public TestResponse run(TestRequest request) {
         MockH2StreamChannel channel = new MockH2StreamChannel();
         URL url = new URL(TestProtocol.NAME, TestProtocol.HOST, 
TestProtocol.PORT, request.getProviderParams());

Reply via email to