This is an automated email from the ASF dual-hosted git repository.

youling1128 pushed a commit to branch 2.9.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/2.9.x by this push:
     new cacfb7924 [#4873] Support for SSE interface RPC calls (#4875)
cacfb7924 is described below

commit cacfb79249536f152d341e174c191472f2eb5c4b
Author: Alex <[email protected]>
AuthorDate: Wed Aug 20 20:35:55 2025 +0800

    [#4873] Support for SSE interface RPC calls (#4875)
---
 .github/workflows/maven.yml                        |   2 +-
 .../common/rest/AbstractRestInvocation.java        |   2 +-
 .../common/rest/RestProducerInvocationFlow.java    |  10 +-
 .../codec/produce/ProduceEventStreamProcessor.java | 412 +++++++++++++++++++++
 .../common/rest/definition/RestOperationMeta.java  |   8 +
 .../rest/filter/inner/RestServerCodecFilter.java   |   4 +-
 .../rest/filter/inner/ServerRestArgsFilter.java    |  92 ++++-
 ...comb.common.rest.codec.produce.ProduceProcessor |   3 +-
 .../common/rest/TestAbstractRestInvocation.java    |   4 +-
 .../produce/TestProduceEventStreamProcessor.java   | 315 ++++++++++++++++
 demo/demo-cse-v2/gateway/pom.xml                   |  14 +
 demo/demo-cse-v2/provider/pom.xml                  |   4 +
 .../org/apache/servicecomb/demo/model/Model.java   |  37 +-
 .../springmvc/client/ReactiveStreamIT.java         | 205 ++++++++++
 .../springmvc/client/ThirdSvcConfiguration.java    |  53 +++
 .../springmvc/server/ReactiveStreamController.java |  73 ++++
 dependencies/default/pom.xml                       |  17 +
 foundations/foundation-vertx/pom.xml               |   4 +
 .../vertx/client/tcp/TcpClientConnection.java      |   2 +-
 .../vertx/http/AbstractHttpServletResponse.java    |  13 +-
 .../vertx/http/HttpServletResponseEx.java          |  15 +
 .../vertx/http/StandardHttpServletResponseEx.java  |  14 +-
 .../VertxClientResponseToHttpServletResponse.java  |  13 +
 .../VertxServerResponseToHttpServletResponse.java  |  26 +-
 .../vertx/stream/BufferOutputStream.java           |  36 +-
 .../foundation/vertx/tcp/TcpConnection.java        |   9 +-
 .../servicecomb/foundation/vertx/TestStream.java   |   2 +-
 .../vertx/client/tcp/TestTcpClientConnection.java  |   8 +-
 .../http/TestAbstractHttpServletResponse.java      |   3 +-
 .../http/TestStandardHttpServletResponseEx.java    |   6 +-
 ...stVertxServerResponseToHttpServletResponse.java |   8 +-
 swagger/swagger-generator/generator-core/pom.xml   |   4 +
 .../processor/response/PublisherProcessor.java     |  50 +++
 ...icecomb.swagger.generator.ResponseTypeProcessor |   1 +
 swagger/swagger-invocation/invocation-core/pom.xml |   8 +
 .../consumer/PublisherConsumerResponseMapper.java  |  33 +-
 .../PublisherConsumerResponseMapperFactory.java    |  43 +++
 .../producer/PublisherProducerResponseMapper.java  |  42 +++
 .../PublisherProducerResponseMapperFactory.java    |  43 +++
 .../invocation/sse/SseEventResponseEntity.java     | 111 ++++++
 .../sse/SseEventResponseEntityProcessor.java       |  28 +-
 ...icecomb.swagger.generator.ResponseTypeProcessor |   1 +
 ...response.consumer.ConsumerResponseMapperFactory |   1 +
 ...response.producer.ProducerResponseMapperFactory |   1 +
 .../highway/HighwayProducerInvocationFlow.java     |   2 +-
 .../transport/highway/HighwayServerInvoke.java     |   2 +-
 .../transport-rest/transport-rest-client/pom.xml   |   4 +
 .../rest/client/http/DefaultHttpClientFilter.java  |  33 +-
 .../rest/client/http/RestClientInvocation.java     |  35 +-
 .../client/http/TestDefaultHttpClientFilter.java   |   8 +
 50 files changed, 1749 insertions(+), 115 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 3d5f372d7..5788af888 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -36,7 +36,7 @@ jobs:
     - name: Set up jdk
       uses: actions/setup-java@v3
       with:
-        java-version: '21'
+        java-version: '17'
         distribution: 'temurin'
     - name: Set up Maven
       uses: stCarolas/[email protected]
diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 1f3d18766..ede3e94d4 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -351,7 +351,7 @@ public abstract class AbstractRestInvocation {
 
     if (!(response.getResult() instanceof ServerWebSocket)) {
       try {
-        responseEx.flushBuffer();
+        responseEx.endResponse();
       } catch (Throwable flushException) {
         LOGGER.error("Failed to flush rest response, operation:{}, request 
uri:{}",
             getMicroserviceQualifiedName(), requestEx.getRequestURI(), 
flushException);
diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
index 70c3a5a3a..673e45ee2 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
@@ -53,7 +53,7 @@ public class RestProducerInvocationFlow extends 
ProducerInvocationFlow {
           requestEx.getRequestURI(), e);
     }
 
-    flushResponse("UNKNOWN_OPERATION");
+    endResponse("UNKNOWN_OPERATION");
     return null;
   }
 
@@ -61,16 +61,16 @@ public class RestProducerInvocationFlow extends 
ProducerInvocationFlow {
   protected void sendResponse(Invocation invocation, Response response) {
     if (isDownloadFileResponseType(invocation, response)) {
       responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()))
-          .whenComplete((r, e) -> 
flushResponse(invocation.getMicroserviceQualifiedName()));
+          .whenComplete((r, e) -> 
endResponse(invocation.getMicroserviceQualifiedName()));
       return;
     }
 
-    flushResponse(invocation.getMicroserviceQualifiedName());
+    endResponse(invocation.getMicroserviceQualifiedName());
   }
 
-  private void flushResponse(String operationName) {
+  private void endResponse(String operationName) {
     try {
-      responseEx.flushBuffer();
+      responseEx.endResponse();
     } catch (Throwable flushException) {
       LOGGER.error("Failed to flush rest response, operation:{}, request 
uri:{}",
           operationName, requestEx.getRequestURI(), flushException);
diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
new file mode 100644
index 000000000..109a460cd
--- /dev/null
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.common.rest.codec.produce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
+import org.apache.servicecomb.foundation.vertx.stream.BufferInputStream;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.reactivex.rxjava3.core.Flowable;
+import io.vertx.core.buffer.Buffer;
+import jakarta.ws.rs.core.MediaType;
+
+public class ProduceEventStreamProcessor implements ProduceProcessor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProduceEventStreamProcessor.class);
+
+  private static final String CR_STR = "\r";
+
+  private static final byte[] CR = CR_STR.getBytes(StandardCharsets.UTF_8);
+
+  private static final String LF_STR = "\n";
+
+  private static final byte[] LF = LF_STR.getBytes(StandardCharsets.UTF_8);
+
+  private static final String CRLF_STR = "\r\n";
+
+  private static final byte[] CRLF = CRLF_STR.getBytes(StandardCharsets.UTF_8);
+
+  private String lineDelimiter;
+
+  private byte[] lineDelimiterBytes;
+
+  private int writeIndex = 0;
+
+  @Override
+  public String getName() {
+    return MediaType.SERVER_SENT_EVENTS;
+  }
+
+  @Override
+  public int getOrder() {
+    return 0;
+  }
+
+  @Override
+  public void doEncodeResponse(OutputStream output, Object result) throws 
Exception {
+    StringBuilder eventBuilder = new StringBuilder();
+    if (result instanceof SseEventResponseEntity<?> responseEntity) {
+      appendId(eventBuilder, responseEntity.getId());
+      appendEvent(eventBuilder, responseEntity.getEvent());
+      appendRetry(eventBuilder, responseEntity.getRetry());
+      appendData(eventBuilder, responseEntity.getData());
+      eventBuilder.append("\n");
+      output.write(eventBuilder.toString().getBytes(StandardCharsets.UTF_8));
+    } else {
+      LOGGER.warn("Does not support encoding objects other than 
SseEventResponseEntity!");
+    }
+  }
+
+  private enum ProcessStatus {
+    DETERMINE_LINE_DELIMITER,
+    MATCHING_CR,
+    MATCHING_LF,
+    MATCHING_CRLF,
+    MATCHING_LINE,
+    END_OF_MESSAGE,
+    /**
+     * The whole SSE stream is closed.
+     * Be careful: there may be remaining buffer should be processed.
+     */
+    END_OF_STREAM
+  }
+
+  private ProcessStatus loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
+
+  private int matchingDelimiterIndex = 0;
+
+  final ByteBuf buffer = Unpooled.buffer();
+
+  private SseEventResponseEntity<?> currentEntity = new 
SseEventResponseEntity<>();
+
+  private List<SseEventResponseEntity<?>> entityList = new ArrayList<>();
+
+  private JavaType type;
+
+  @Override
+  public List<SseEventResponseEntity<?>> doDecodeResponse(InputStream input, 
JavaType type) throws Exception {
+    this.type = type;
+    final byte[] readCache = new byte[Math.min(128, input.available())];
+    int bytesRead;
+    while ((bytesRead = input.read(readCache)) > 0) {
+      processAllBytes(readCache, bytesRead);
+    }
+    final List<SseEventResponseEntity<?>> resultList = entityList;
+    entityList = new ArrayList<>();
+    return resultList;
+  }
+
+  private void processAllBytes(byte[] readCache, int cacheEndPos) {
+    int lastProcessedPosition = innerLoop(readCache, 0, cacheEndPos);
+    while (lastProcessedPosition < cacheEndPos) {
+      lastProcessedPosition = innerLoop(readCache, lastProcessedPosition, 
cacheEndPos);
+    }
+  }
+
+  private int innerLoop(final byte[] readCache, final int startPos, final int 
cacheEndPos) {
+    if (startPos >= cacheEndPos) {
+      return cacheEndPos;
+    }
+    switch (loopStatus) {
+      case MATCHING_CR -> {
+        return tryToMatchDelimiterCR(readCache, startPos, cacheEndPos);
+      }
+      case MATCHING_CRLF -> {
+        return tryToMatchDelimiterCRLF(readCache, startPos, cacheEndPos);
+      }
+      case MATCHING_LF -> {
+        return tryToMatchDelimiterLF(readCache, startPos, cacheEndPos);
+      }
+      case DETERMINE_LINE_DELIMITER -> {
+        return searchFirstLineDelimiter(readCache, startPos, cacheEndPos);
+      }
+      case MATCHING_LINE -> {
+        return bufferReadCacheAndProcessLines(readCache, startPos, 
cacheEndPos);
+      }
+      case END_OF_STREAM -> {
+        return processLeftBuffer(cacheEndPos);
+      }
+      default -> throw new IllegalStateException("unexpected case");
+    }
+  }
+
+  private int processLeftBuffer(int cacheEndPos) {
+    final byte[] bytes = readAllBytesFromBuffer(buffer);
+    final String bufferStr = new String(bytes, StandardCharsets.UTF_8);
+    processStringBuffer(bufferStr);
+    return cacheEndPos;
+  }
+
+  private int bufferReadCacheAndProcessLines(byte[] readCache, int startPos, 
int cacheEndPos) {
+    buffer.writeBytes(readCache, startPos, cacheEndPos - startPos);
+    processAllAvailableBufferLines();
+    return cacheEndPos;
+  }
+
+  private int tryToMatchDelimiterCR(byte[] readCache, int startPos, int 
cacheEndPos) {
+    int bytesProcessed = 0;
+    for (; matchingDelimiterIndex < CR.length && startPos + bytesProcessed < 
cacheEndPos; ++bytesProcessed) {
+      if (readCache[startPos + bytesProcessed] == CR[matchingDelimiterIndex]) {
+        buffer.writeByte(readCache[startPos + bytesProcessed]);
+        ++matchingDelimiterIndex;
+      } else {
+        loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
+        matchingDelimiterIndex = 0;
+        return startPos + bytesProcessed;
+      }
+    }
+    if (matchingDelimiterIndex == CR.length) {
+      // matched all CR bytes, attempting to further match CRLF.
+      loopStatus = ProcessStatus.MATCHING_CRLF;
+    }
+    return startPos + bytesProcessed;
+  }
+
+  private int tryToMatchDelimiterCRLF(byte[] readCache, int startPos, int 
cacheEndPos) {
+    // If you enter this branch, it means that at least CR should be used as 
the line break character.
+    int bytesProcessed = 0;
+    for (; matchingDelimiterIndex < CRLF.length && startPos + bytesProcessed < 
cacheEndPos; ++bytesProcessed) {
+      if (readCache[startPos + bytesProcessed] == 
CRLF[matchingDelimiterIndex]) {
+        buffer.writeByte(readCache[startPos + bytesProcessed]);
+        ++matchingDelimiterIndex;
+      } else {
+        determineDelimiter(CR_STR, CR);
+        return startPos + bytesProcessed;
+      }
+    }
+    if (matchingDelimiterIndex == CRLF.length) {
+      determineDelimiter(CRLF_STR, CRLF);
+    }
+    return startPos + bytesProcessed;
+  }
+
+  private int tryToMatchDelimiterLF(byte[] readCache, int startPos, int 
cacheEndPos) {
+    int bytesProcessed = 0;
+    for (; matchingDelimiterIndex < LF.length && startPos + bytesProcessed < 
cacheEndPos; ++bytesProcessed) {
+      if (readCache[startPos + bytesProcessed] == LF[matchingDelimiterIndex]) {
+        buffer.writeByte(readCache[startPos + bytesProcessed]);
+        ++matchingDelimiterIndex;
+      } else {
+        loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
+        matchingDelimiterIndex = 0;
+        return startPos + bytesProcessed;
+      }
+    }
+    if (matchingDelimiterIndex == LF.length) {
+      determineDelimiter(LF_STR, LF);
+    }
+    return startPos + bytesProcessed;
+  }
+
+  private void determineDelimiter(String delimiterStr, byte[] delimiterBytes) {
+    lineDelimiter = delimiterStr;
+    lineDelimiterBytes = delimiterBytes;
+    matchingDelimiterIndex = 0;
+    loopStatus = ProcessStatus.MATCHING_LINE;
+  }
+
+  private int searchFirstLineDelimiter(byte[] readCache, int startPos, int 
cacheEndPos) {
+    for (int i = startPos; i < cacheEndPos; ++i) {
+      if (readCache[i] == CR[0]) {
+        loopStatus = ProcessStatus.MATCHING_CR;
+        matchingDelimiterIndex = 0;
+        return i;
+      } else if (readCache[i] == LF[0]) {
+        loopStatus = ProcessStatus.MATCHING_LF;
+        matchingDelimiterIndex = 0;
+        return i;
+      } else {
+        buffer.writeByte(readCache[i]);
+      }
+    }
+    return cacheEndPos;
+  }
+
+  private void processAllAvailableBufferLines() {
+    while (buffer.readableBytes() > 0) {
+      final byte[] bytes = readALineOfBytesFromBuffer(buffer);
+      if (bytes == null || bytes.length == 0) {
+        return;
+      }
+      final String bufferStr = new String(bytes, StandardCharsets.UTF_8);
+      processStringBuffer(bufferStr);
+    }
+  }
+
+  private void processStringBuffer(String bufferStr) {
+    int cursor = 0;
+    int delimiterIdx;
+    while ((delimiterIdx = bufferStr.indexOf(lineDelimiter, cursor)) >= 0) {
+      final String line = bufferStr.substring(cursor, delimiterIdx);
+      processStringLine(line);
+      cursor = delimiterIdx + lineDelimiter.length();
+    }
+    if (cursor < bufferStr.length()) {
+      
buffer.writeBytes(bufferStr.substring(cursor).getBytes(StandardCharsets.UTF_8));
+    }
+  }
+
+  private void processStringLine(String line) {
+    if (StringUtils.isBlank(line)) {
+      if (currentEntity.isEmpty()) {
+        return;
+      }
+      entityList.add(currentEntity);
+      currentEntity = new SseEventResponseEntity<>();
+      return;
+    }
+    final String[] split = line.split(":", 2);
+    if (split.length < 2) {
+      LOGGER.error("get a line of sse event without colon! stream is 
breaking!");
+      throw new IllegalStateException("get a line of sse event without 
colon!");
+    }
+    switch (split[0]) {
+      case "event" -> {
+        if (StringUtils.isNotBlank(split[1])) {
+          currentEntity.event(split[1].trim());
+        }
+      }
+      case "id" -> {
+        if (StringUtils.isNotBlank(split[1])) {
+          currentEntity.id(Integer.parseInt(split[1].trim()));
+        }
+      }
+      case "data" -> {
+        try {
+          
currentEntity.data(RestObjectMapperFactory.getRestObjectMapper().readValue(split[1].trim(),
 type));
+        } catch (JsonProcessingException e) {
+          LOGGER.error("failed to process data of sse event: [{}]", 
e.getMessage());
+          throw new IllegalStateException("failed to process data of sse 
event", e);
+        }
+      }
+      case "retry" -> {
+        if (StringUtils.isNotBlank(split[1])) {
+          currentEntity.retry(Long.parseLong(split[1].trim()));
+        }
+      }
+      default -> {
+        LOGGER.debug("unrecognized sse message line! ignored string segment 
length=[{}]", line.length());
+      }
+    }
+  }
+
+  private byte[] readALineOfBytesFromBuffer(ByteBuf buffer) {
+    matchingDelimiterIndex = 0;
+    try (final ByteArrayOutputStream bos = new 
ByteArrayOutputStream(buffer.readableBytes())) {
+      while (buffer.readableBytes() > 0 && matchingDelimiterIndex < 
lineDelimiterBytes.length) {
+        final byte b = buffer.readByte();
+        if (b == lineDelimiterBytes[matchingDelimiterIndex]) {
+          ++matchingDelimiterIndex;
+        }
+        bos.write(b);
+      }
+      if (matchingDelimiterIndex < lineDelimiterBytes.length) {
+        // The newline character was not matched, so this part of the buffer 
does not constitute a complete line of
+        // content and needs to remain in the buffer, waiting for the next 
segment to arrive for processing.
+        buffer.writeBytes(bos.toByteArray());
+        return null;
+      }
+      matchingDelimiterIndex = 0;
+      return bos.toByteArray();
+    } catch (IOException e) {
+      throw new IllegalStateException("impossible error while closing 
ByteArrayOutputStream", e);
+    }
+  }
+
+  private byte[] readAllBytesFromBuffer(ByteBuf buffer) {
+    try (final ByteArrayOutputStream bos = new 
ByteArrayOutputStream(buffer.readableBytes())) {
+      buffer.readBytes(bos, buffer.readableBytes());
+      return bos.toByteArray();
+    } catch (IOException e) {
+      throw new IllegalStateException("impossible error while closing 
ByteArrayOutputStream", e);
+    }
+  }
+
+  private void appendId(StringBuilder eventBuilder, Integer eventId) {
+    int id = eventId != null ? eventId : writeIndex++;
+    eventBuilder.append("id: ").append(id).append("\n");
+  }
+
+  private void appendEvent(StringBuilder eventBuilder, String event) {
+    if (StringUtils.isEmpty(event)) {
+      return;
+    }
+    eventBuilder.append("event: ").append(event).append("\n");
+  }
+
+  private void appendRetry(StringBuilder eventBuilder, Long retry) {
+    if (retry == null) {
+      return;
+    }
+    eventBuilder.append("retry: ").append(retry.longValue()).append("\n");
+  }
+
+  private void appendData(StringBuilder eventBuilder, List<?> datas) throws 
Exception {
+    if (CollectionUtils.isEmpty(datas)) {
+      throw new Exception("sse response data is null!");
+    }
+    for (Object data : datas) {
+      eventBuilder.append("data: ")
+          
.append(RestObjectMapperFactory.getRestObjectMapper().writeValueAsString(data))
+          .append("\n");
+    }
+  }
+
+  @Override
+  public Publisher<SseEventResponseEntity<?>> decodeResponse(Buffer buffer, 
JavaType type) throws Exception {
+    if (buffer.length() == 0) {
+      return Flowable.empty();
+    }
+
+    try (BufferInputStream input = new BufferInputStream(buffer.getByteBuf())) 
{
+      final List<SseEventResponseEntity<?>> list = doDecodeResponse(input, 
type);
+      return Flowable.fromIterable(list);
+    }
+  }
+
+  public Publisher<SseEventResponseEntity<?>> close() throws Exception {
+    if (type == null) {
+      return Flowable.empty();
+    }
+    try (final ByteArrayInputStream input = new ByteArrayInputStream(
+        (lineDelimiter + lineDelimiter).getBytes(StandardCharsets.UTF_8))) {
+      // Write two additional newline characters into the buffer to ensure 
that the processor completes
+      // processing all remaining content in the buffer.
+      final List<SseEventResponseEntity<?>> list = doDecodeResponse(input, 
type);
+      return Flowable.fromIterable(list);
+    }
+  }
+}
diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
index 5061ef783..9da6d06f3 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
@@ -88,6 +88,8 @@ public class RestOperationMeta {
   // 快速构建URL path
   private URLPathBuilder pathBuilder;
 
+  protected static final String EVENTS_MEDIA_TYPE = 
MediaType.SERVER_SENT_EVENTS;
+
   public void init(OperationMeta operationMeta) {
     this.operationMeta = operationMeta;
 
@@ -258,6 +260,12 @@ public class RestOperationMeta {
           
ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass));
     } else {
       for (String produce : produces) {
+        if (produce.contains(EVENTS_MEDIA_TYPE)) {
+          // When the produce type is event-stream, the 
ProduceEventStreamProcessor implementation class corresponding
+          // to event-stream is not added, and it is set to the default type 
ProduceJsonProcessor.
+          // In case of an exception, the response result is parsed.
+          continue;
+        }
         if (produce.contains(";")) {
           produce = produce.substring(0, produce.indexOf(";"));
         }
diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
index eaf62b632..f556ffa05 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
@@ -44,8 +44,8 @@ import 
org.apache.servicecomb.foundation.vertx.stream.BufferOutputStream;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.springframework.stereotype.Component;
 
-import io.netty.buffer.Unpooled;
 import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
 
 @Component
 public class RestServerCodecFilter implements ProducerFilter {
@@ -103,7 +103,7 @@ public class RestServerCodecFilter implements 
ProducerFilter {
     }
 
     responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
-    try (BufferOutputStream output = new 
BufferOutputStream(Unpooled.compositeBuffer())) {
+    try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
       produceProcessor.encodeResponse(output, response.getResult());
 
       responseEx.setBodyBuffer(output.getBuffer());
diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
index bef5b48a4..281769d75 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
@@ -19,11 +19,13 @@ package org.apache.servicecomb.common.rest.filter.inner;
 
 import static 
org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter.isDownloadFileResponseType;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.servicecomb.common.rest.RestConst;
 import org.apache.servicecomb.common.rest.codec.RestCodec;
+import 
org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor;
 import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
 import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
 import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
@@ -36,15 +38,22 @@ import 
org.apache.servicecomb.foundation.vertx.stream.BufferOutputStream;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.netflix.config.DynamicPropertyFactory;
 
-import io.netty.buffer.Unpooled;
+import io.vertx.core.buffer.Buffer;
 
 public class ServerRestArgsFilter implements HttpServerFilter {
   private static final boolean enabled = 
DynamicPropertyFactory.getInstance().getBooleanProperty
       ("servicecomb.http.filter.server.serverRestArgs.enabled", true).get();
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerRestArgsFilter.class);
+
   @Override
   public int getOrder() {
     return -100;
@@ -74,10 +83,15 @@ public class ServerRestArgsFilter implements 
HttpServerFilter {
       return responseEx.sendPart(PartUtils.getSinglePart(null, 
response.getResult()));
     }
 
-    responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
+    if (isServerSendEvent(response)) {
+      produceProcessor = new ProduceEventStreamProcessor();
+      responseEx.setContentType(produceProcessor.getName() + "; 
charset=utf-8");
+      return writeServerSendEvent(invocation, response, produceProcessor, 
responseEx);
+    }
 
+    responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
     CompletableFuture<Void> future = new CompletableFuture<>();
-    try (BufferOutputStream output = new 
BufferOutputStream(Unpooled.compositeBuffer())) {
+    try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
       if (failed) {
         produceProcessor.encodeResponse(output, ((InvocationException) 
response.getResult()).getErrorData());
       } else {
@@ -91,4 +105,76 @@ public class ServerRestArgsFilter implements 
HttpServerFilter {
     }
     return future;
   }
+
+  public static boolean isServerSendEvent(Response response) {
+    return response.getResult() instanceof Publisher<?>;
+  }
+
+  private static CompletableFuture<Void> writeServerSendEvent(Invocation 
invocation, Response response,
+      ProduceProcessor produceProcessor, HttpServletResponseEx responseEx) {
+    responseEx.setChunkedForEvent(true);
+    CompletableFuture<Void> result = new CompletableFuture<>();
+    Publisher<?> publisher = response.getResult();
+    publisher.subscribe(new Subscriber<Object>() {
+      Subscription subscription;
+
+      @Override
+      public void onSubscribe(Subscription s) {
+        s.request(1);
+        subscription = s;
+      }
+
+      @Override
+      public void onNext(Object o) {
+        try {
+          writeResponse(responseEx, produceProcessor, o, 
response).whenComplete((r, e) -> {
+            if (e != null) {
+              subscription.cancel();
+              result.completeExceptionally(e);
+              return;
+            }
+            subscription.request(1);
+          });
+        } catch (Throwable e) {
+          LOGGER.warn("Failed to subscribe event: {}", o, e);
+          result.completeExceptionally(e);
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        result.completeExceptionally(t);
+      }
+
+      @Override
+      public void onComplete() {
+        result.complete(null);
+      }
+    });
+    return result;
+  }
+
+  private static CompletableFuture<Response> writeResponse(
+      HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, 
Object data, Response response) {
+    try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
+      produceProcessor.encodeResponse(output, data);
+      CompletableFuture<Response> result = new CompletableFuture<>();
+      responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> {
+        if (e != null) {
+          result.completeExceptionally(e);
+        }
+        try {
+          responseEx.flushBuffer();
+        } catch (IOException ex) {
+          LOGGER.warn("Failed to flush buffer for Server Send Events", ex);
+        }
+      });
+      result.complete(response);
+      return result;
+    } catch (Throwable e) {
+      LOGGER.error("internal service error must be fixed.", e);
+      responseEx.setStatus(500);
+      return CompletableFuture.failedFuture(e);
+    }
+  }
 }
diff --git 
a/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
 
b/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
index 9353b9dce..a05c6126d 100644
--- 
a/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
+++ 
b/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
@@ -16,4 +16,5 @@
 #
 
 org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor
-org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
\ No newline at end of file
+org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
+org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor
\ No newline at end of file
diff --git 
a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
 
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index 0d327ae06..f0ac33456 100644
--- 
a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ 
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -916,7 +916,7 @@ public class TestAbstractRestInvocation {
       }
 
       @Override
-      public void flushBuffer() {
+      public void endResponse() {
         endCount.value = endCount.value + 1;
       }
 
@@ -956,7 +956,7 @@ public class TestAbstractRestInvocation {
       }
 
       @Override
-      public void flushBuffer() {
+      public void endResponse() {
         endCount.value = endCount.value + 1;
       }
 
diff --git 
a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java
 
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java
new file mode 100644
index 000000000..c1cbf95b9
--- /dev/null
+++ 
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.common.rest.codec.produce;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JavaType;
+
+import io.reactivex.rxjava3.core.Flowable;
+
+/**
+ * Test ProduceEventStreamProcessor
+ *
+ * @since 2025-08-19
+ */
+public class TestProduceEventStreamProcessor {
+  @Test
+  public void doDecodeResponse() throws Exception {
+    doDecodeResponseTemplateComposite("\n");
+    doDecodeResponseTemplateComposite("\r");
+    doDecodeResponseTemplateComposite("\r\n");
+  }
+
+  private void doDecodeResponseTemplateComposite(String lineDelimiter) throws 
Exception {
+    doDecodeResponseTemplate(lineDelimiter, 2);
+    doDecodeResponseTemplate(lineDelimiter, 1);
+    doDecodeResponseTemplate(lineDelimiter, 0);
+  }
+
+  private void doDecodeResponseTemplate(String lineDelimiter, int 
delimiterInTheFirstSegment) throws Exception {
+    final ProduceEventStreamProcessor processor = new 
ProduceEventStreamProcessor();
+    final ByteArrayInputStream stream0 = prepareStream(
+        "id: 0" + lineDelimiter + "data: \"aaa\"" + 
lineDelimiter.repeat(delimiterInTheFirstSegment));
+    final Object o0 = processor.doDecodeResponse(stream0,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    if (delimiterInTheFirstSegment > 1) {
+      checkDecodeResult(o0, new ItemChecker(0, 
Collections.singletonList("aaa"), null, null));
+    } else {
+      checkDecodeResult(o0);
+    }
+
+    final ByteArrayInputStream stream1 = prepareStream(
+        lineDelimiter.repeat(2 - delimiterInTheFirstSegment) + "id: 1" + 
lineDelimiter + "data: \"bbb\""
+            + lineDelimiter);
+    final Object o1 = processor.doDecodeResponse(stream1,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    if (delimiterInTheFirstSegment > 1) {
+      checkDecodeResult(o1);
+    } else {
+      checkDecodeResult(o1, new ItemChecker(0, 
Collections.singletonList("aaa"), null, null));
+    }
+
+    final ByteArrayInputStream stream2 = prepareStream(
+        lineDelimiter + "id: 2" + lineDelimiter + "data: \"ccc\"" + 
lineDelimiter + "event: test" + lineDelimiter
+            + "retry: 123");
+    final Object o2 = processor.doDecodeResponse(stream2,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    checkDecodeResult(o2,
+        new ItemChecker(1, Collections.singletonList("bbb"), null, null));
+
+    final ByteArrayInputStream stream3 = prepareStream(
+        lineDelimiter + lineDelimiter + "id: 3" + lineDelimiter + "data: 
\"ddd\"" + lineDelimiter
+            + "event: test3" + lineDelimiter + "retry: 321" + lineDelimiter);
+    final Object o3 = processor.doDecodeResponse(stream3,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    checkDecodeResult(o3,
+        new ItemChecker(2, Collections.singletonList("ccc"), "test", 123L));
+    final Object o4 = ((Flowable) processor.close()).toList().blockingGet();
+    checkDecodeResult(o4, new ItemChecker(3, Collections.singletonList("ddd"), 
"test3", 321L));
+  }
+
+  @Test
+  public void doDecodeResponseMultiData() throws Exception {
+    doDecodeResponseMultiDataTemplate("\n");
+    doDecodeResponseMultiDataTemplate("\r");
+    doDecodeResponseMultiDataTemplate("\r\n");
+  }
+
+  private void doDecodeResponseMultiDataTemplate(String lineDelimiter) throws 
Exception {
+    final ProduceEventStreamProcessor processor = new 
ProduceEventStreamProcessor();
+
+    final ByteArrayInputStream stream0 = prepareStream(
+        "id: 0" + lineDelimiter + "data: \"aaa1\"" + lineDelimiter + "data: 
\"aaa2\"" + lineDelimiter + lineDelimiter);
+    final Object o0 = processor.doDecodeResponse(stream0,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    checkDecodeResult(o0, new ItemChecker(0, Arrays.asList("aaa1", "aaa2"), 
null, null));
+
+    final ByteArrayInputStream stream1 = prepareStream(
+        "id: 1" + lineDelimiter + "data: \"aaa3\"" + lineDelimiter + "data: 
\"aaa4\"" + lineDelimiter + "data: \"aaa5\""
+            + lineDelimiter + "data: \"aaa6\"");
+    final Object o1 = processor.doDecodeResponse(stream1,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    checkDecodeResult(o1);
+    final Object o2 = ((Flowable) processor.close()).toList().blockingGet();
+    checkDecodeResult(o2, new ItemChecker(1, Arrays.asList("aaa3", "aaa4", 
"aaa5", "aaa6"), null, null));
+  }
+
+  @Test
+  public void doDecodeResponseMultiPackage() throws Exception {
+    doDecodeResponseMultiPackageTemplate("\n");
+    doDecodeResponseMultiPackageTemplate("\r");
+    doDecodeResponseMultiPackageTemplate("\r\n");
+  }
+
+  private void doDecodeResponseMultiPackageTemplate(String lineDelimiter) 
throws Exception {
+    final ProduceEventStreamProcessor processor = new 
ProduceEventStreamProcessor();
+
+    final ByteArrayInputStream stream0 = prepareStream(
+        "id: 0" + lineDelimiter + "data: \"aaa\"" + lineDelimiter + 
lineDelimiter + lineDelimiter);
+    final Object o0 = processor.doDecodeResponse(stream0,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    checkDecodeResult(o0, new ItemChecker(0, Collections.singletonList("aaa"), 
null, null));
+
+    final ByteArrayInputStream stream1 = prepareStream("id: 1" + lineDelimiter 
+ "data: \"bbb\"" + lineDelimiter);
+    final Object o1 = processor.doDecodeResponse(stream1,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    checkDecodeResult(o1);
+
+    final ByteArrayInputStream stream2 = prepareStream(
+        lineDelimiter + "id: 2" + lineDelimiter + "data: \"ccc\"" + 
lineDelimiter + "event: test" + lineDelimiter
+            + "retry: 123" + lineDelimiter + lineDelimiter + "id: 3" + 
lineDelimiter + "data: \"ddd\"" + lineDelimiter
+            + "event: test3" + lineDelimiter + "retry: 321" + lineDelimiter + 
lineDelimiter);
+    final Object o2 = processor.doDecodeResponse(stream2,
+        
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+    checkDecodeResult(o2,
+        new ItemChecker(1, Collections.singletonList("bbb"), null, null),
+        new ItemChecker(2, Collections.singletonList("ccc"), "test", 123L),
+        new ItemChecker(3, Collections.singletonList("ddd"), "test3", 321L));
+    final Object o4 = ((Flowable) processor.close()).toList().blockingGet();
+    checkDecodeResult(o4);
+  }
+
+  @Test
+  public void doDecodeResponseHalfPackage() {
+    for (int splitIndexesCount = 1; splitIndexesCount < 3; 
splitIndexesCount++) {
+      for (int trailingDelimiterCount = 0; trailingDelimiterCount < 3; 
trailingDelimiterCount++) {
+        doDecodeResponseHalfPackageTemplate("\n", splitIndexesCount, 
trailingDelimiterCount);
+        doDecodeResponseHalfPackageTemplate("\r", splitIndexesCount, 
trailingDelimiterCount);
+        doDecodeResponseHalfPackageTemplate("\r\n", splitIndexesCount, 
trailingDelimiterCount);
+      }
+    }
+  }
+
+  private void doDecodeResponseHalfPackageTemplate(String lineDelimiter, int 
splitIndexesCount,
+      int trailingDelimiterCount) {
+    final String messageTemplate =
+        "data: \"中文aaa\"" + lineDelimiter
+            + "id: 0" + lineDelimiter
+            + lineDelimiter
+            + "id: 1" + lineDelimiter
+            + "data: \"bbb中文\"" + lineDelimiter
+            + "data: \"123汉语\"" + lineDelimiter
+            + lineDelimiter
+            + "data: 
\"~!@#$%^&*()_+=-0987654321`中文[]{}\\\\|;':\\\",./<>?abc\"" + lineDelimiter
+            + "data: \"文字ccc\"" + lineDelimiter
+            + "data: \"中文321\"" + lineDelimiter
+            + "id: 2" + lineDelimiter
+            + "retry: 3600" + lineDelimiter
+            + "event: test" + lineDelimiter.repeat(trailingDelimiterCount);
+    final byte[] messageTemplateBytes = 
messageTemplate.getBytes(StandardCharsets.UTF_8);
+
+    int[] splitIndexes = new int[splitIndexesCount];
+    try {
+      runDecodeResponseHalfPackageTemplate(0, splitIndexes, 
messageTemplateBytes);
+    } catch (AssertionError e) {
+      throw new AssertionError(e.getMessage() + ", messageTemplate=["
+          + messageTemplate
+          + "]", e);
+    }
+  }
+
+  private void runDecodeResponseHalfPackageTemplate(int cursor, int[] 
splitIndexes, byte[] messageTemplateBytes) {
+    if (cursor != splitIndexes.length) {
+      for (int i = cursor == 0 ? 0 : splitIndexes[cursor - 1]; i <= 
messageTemplateBytes.length; ++i) {
+        splitIndexes[cursor] = i;
+        runDecodeResponseHalfPackageTemplate(cursor + 1, splitIndexes, 
messageTemplateBytes);
+      }
+      return;
+    }
+
+    final List<ByteArrayInputStream> byteArrayInputStreams = 
splitByteArrayInputStreams(
+        splitIndexes, messageTemplateBytes);
+
+    final ProduceEventStreamProcessor processor = new 
ProduceEventStreamProcessor();
+    final JavaType javaType = 
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class);
+
+    final List<SseEventResponseEntity<?>> entityList = new ArrayList<>();
+    for (ByteArrayInputStream byteArrayInputStream : byteArrayInputStreams) {
+      final List<SseEventResponseEntity<?>> entities;
+      try {
+        entities = (List<SseEventResponseEntity<?>>) processor.decodeResponse(
+            byteArrayInputStream, javaType);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      entityList.addAll(entities);
+    }
+    try {
+      entityList.addAll(((Flowable<SseEventResponseEntity<?>>) 
processor.close()).toList().blockingGet());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    try {
+      checkDecodeResult(entityList,
+          new ItemChecker(0, Collections.singletonList("中文aaa"), null, null),
+          new ItemChecker(1, Arrays.asList("bbb中文", "123汉语"), null, null),
+          new ItemChecker(2, 
Arrays.asList("~!@#$%^&*()_+=-0987654321`中文[]{}\\|;':\",./<>?abc",
+              "文字ccc", "中文321"), "test", 3600L));
+    } catch (AssertionError e) {
+      throw new AssertionError(e.getMessage() + System.lineSeparator() +
+          ", splitIndexes=" + Arrays.toString(splitIndexes), e);
+    }
+  }
+
+  private List<ByteArrayInputStream> splitByteArrayInputStreams(int[] 
splitIndexes, byte[] messageTemplateBytes) {
+    int splitHeader = 0;
+    final List<ByteArrayInputStream> byteArrayInputStreams = new ArrayList<>();
+    for (int split : splitIndexes) {
+      final byte[] segment = Arrays.copyOfRange(messageTemplateBytes, 
splitHeader, split);
+      final ByteArrayInputStream stream = new ByteArrayInputStream(segment);
+      byteArrayInputStreams.add(stream);
+      splitHeader = split;
+    }
+
+    final byte[] segment = Arrays.copyOfRange(messageTemplateBytes, 
splitIndexes[splitIndexes.length - 1],
+        messageTemplateBytes.length);
+    final ByteArrayInputStream stream = new ByteArrayInputStream(segment);
+    byteArrayInputStreams.add(stream);
+
+    return byteArrayInputStreams;
+  }
+
+  private void checkDecodeResult(Object obj) {
+    MatcherAssert.assertThat(obj, Matchers.instanceOf(List.class));
+    MatcherAssert.assertThat(((List<?>) obj).size(), Matchers.equalTo(0));
+  }
+
+  private void checkDecodeResult(Object obj, ItemChecker... checkers) {
+    MatcherAssert.assertThat(obj, Matchers.instanceOf(List.class));
+    final List<?> objList = (List<?>) obj;
+    MatcherAssert.assertThat("expect size of objList is " + checkers.length,
+        objList.size(), Matchers.equalTo(checkers.length));
+
+    for (int i = 0; i < objList.size(); i++) {
+      Object result = objList.get(i);
+      checkers[i]
+          .check(result);
+    }
+  }
+
+  private ByteArrayInputStream prepareStream(String buffer) {
+    return new ByteArrayInputStream(
+        buffer.getBytes(StandardCharsets.UTF_8));
+  }
+
+  private static class ItemChecker {
+    private int id;
+
+    private List<String> expectDataList;
+
+    private String event;
+
+    private Long retry;
+
+    public ItemChecker(int id, List<String> expectDataList, String event, Long 
retry) {
+      this.id = id;
+      this.expectDataList = expectDataList;
+      this.event = event;
+      this.retry = retry;
+    }
+
+    public void check(Object actualResult) {
+      final SseEventResponseEntity<?> entity0 = ((SseEventResponseEntity<?>) 
actualResult);
+      MatcherAssert.assertThat(entity0.getData(), 
Matchers.instanceOf(List.class));
+      final List<?> data = entity0.getData();
+      MatcherAssert.assertThat("actual data = " + data.toString(), data.size(),
+          Matchers.equalTo(expectDataList.size()));
+      for (int i = 0; i < data.size(); i++) {
+        MatcherAssert.assertThat(data.get(i), 
Matchers.equalTo(expectDataList.get(i)));
+      }
+      MatcherAssert.assertThat("actual event = " + entity0.getEvent(),
+          entity0.getEvent(), event == null ? Matchers.nullValue() : 
Matchers.equalTo(event));
+      MatcherAssert.assertThat("actual retry = " + entity0.getRetry(),
+          entity0.getRetry(), retry == null ? Matchers.nullValue() : 
Matchers.equalTo(retry));
+      MatcherAssert.assertThat("actual id = " + entity0.getId(),
+          entity0.getId(), Matchers.equalTo(id));
+    }
+  }
+}
diff --git a/demo/demo-cse-v2/gateway/pom.xml b/demo/demo-cse-v2/gateway/pom.xml
index 3dc9ed6c3..4d491cdf8 100644
--- a/demo/demo-cse-v2/gateway/pom.xml
+++ b/demo/demo-cse-v2/gateway/pom.xml
@@ -33,6 +33,20 @@
     <dependency>
       <groupId>org.apache.servicecomb</groupId>
       <artifactId>java-chassis-spring-boot-starter-standalone</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-to-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.servicecomb</groupId>
diff --git a/demo/demo-cse-v2/provider/pom.xml 
b/demo/demo-cse-v2/provider/pom.xml
index c5b57816f..e186908d3 100644
--- a/demo/demo-cse-v2/provider/pom.xml
+++ b/demo/demo-cse-v2/provider/pom.xml
@@ -52,6 +52,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>io.reactivex.rxjava3</groupId>
+      <artifactId>rxjava</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
 b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java
similarity index 55%
copy from 
foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
copy to 
demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java
index 265ee658e..7cfdf6bdf 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++ 
b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java
@@ -15,26 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.foundation.vertx.http;
+package org.apache.servicecomb.demo.model;
 
-import java.util.concurrent.CompletableFuture;
+public class Model {
+  private String name;
 
-import jakarta.servlet.http.HttpServletResponse;
-import jakarta.servlet.http.Part;
-import jakarta.ws.rs.core.Response.StatusType;
+  private int age;
 
-import io.vertx.core.http.HttpHeaders;
+  public Model() {
 
-public interface HttpServletResponseEx extends HttpServletResponse, 
BodyBufferSupport {
-  StatusType getStatusType();
+  }
+
+  public Model(String name, int age) {
+    this.name = name;
+    this.age = age;
+  }
 
-  void setAttribute(String key, Object value);
+  public int getAge() {
+    return age;
+  }
 
-  Object getAttribute(String key);
+  public Model setAge(int age) {
+    this.age = age;
+    return this;
+  }
 
-  CompletableFuture<Void> sendPart(Part body);
+  public String getName() {
+    return name;
+  }
 
-  default void setChunked(boolean chunked) {
-    setHeader(HttpHeaders.TRANSFER_ENCODING.toString(), 
HttpHeaders.CHUNKED.toString());
+  public Model setName(String name) {
+    this.name = name;
+    return this;
   }
 }
diff --git 
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
 
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
new file mode 100644
index 000000000..4830f872c
--- /dev/null
+++ 
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.springboot.springmvc.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.apache.servicecomb.demo.CategorizedTestCase;
+import org.apache.servicecomb.demo.TestMgr;
+import org.apache.servicecomb.demo.model.Model;
+import 
org.apache.servicecomb.springboot.springmvc.client.ThirdSvcConfiguration.ReactiveStreamClient;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ReactiveStreamIT implements CategorizedTestCase {
+  @Autowired
+  @Qualifier("reactiveStreamProvider")
+  ReactiveStreamClient reactiveStreamProvider;
+
+  @Override
+  public void testRestTransport() throws Exception {
+    testSseString(reactiveStreamProvider);
+    testSseStringWithParam(reactiveStreamProvider);
+    testSseModel(reactiveStreamProvider);
+    testSseResponseEntity(reactiveStreamProvider);
+    testSseMultipleData(reactiveStreamProvider);
+  }
+
+  private void testSseString(ReactiveStreamClient client) throws Exception {
+    Publisher<String> result = client.sseString();
+    TestMgr.check("abc", buildBufferString(result));
+  }
+
+  private void testSseStringWithParam(ReactiveStreamClient client) throws 
Exception {
+    Publisher<String> result = client.sseStringWithParam("d");
+    TestMgr.check("abcd", buildBufferString(result));
+  }
+
+  private String buildBufferString(Publisher<String> result) throws Exception {
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    StringBuilder buffer = new StringBuilder();
+    result.subscribe(new Subscriber<>() {
+      Subscription subscription;
+
+      @Override
+      public void onSubscribe(Subscription s) {
+        subscription = s;
+        subscription.request(1);
+      }
+
+      @Override
+      public void onNext(String s) {
+        buffer.append(s);
+        subscription.request(1);
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        subscription.cancel();
+        countDownLatch.countDown();
+      }
+
+      @Override
+      public void onComplete() {
+        countDownLatch.countDown();
+      }
+    });
+    countDownLatch.await(10, TimeUnit.SECONDS);
+    return buffer.toString();
+  }
+
+  private void testSseModel(ReactiveStreamClient client) throws Exception {
+    Publisher<Model> result = client.sseModel();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    StringBuilder buffer = new StringBuilder();
+    result.subscribe(new Subscriber<>() {
+      Subscription subscription;
+
+      @Override
+      public void onSubscribe(Subscription s) {
+        subscription = s;
+        subscription.request(1);
+      }
+
+      @Override
+      public void onNext(Model model) {
+        buffer.append(model.getName()).append(model.getAge());
+        subscription.request(1);
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        subscription.cancel();
+        countDownLatch.countDown();
+      }
+
+      @Override
+      public void onComplete() {
+        countDownLatch.countDown();
+      }
+    });
+    countDownLatch.await(10, TimeUnit.SECONDS);
+    TestMgr.check("jack0jack1jack2jack3jack4", buffer.toString());
+  }
+
+  private void testSseResponseEntity(ReactiveStreamClient client) throws 
Exception {
+    Publisher<SseEventResponseEntity<Model>> result = 
client.sseResponseEntity();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    StringBuilder buffer = new StringBuilder();
+    result.subscribe(new Subscriber<>() {
+      Subscription subscription;
+
+      @Override
+      public void onSubscribe(Subscription s) {
+        subscription = s;
+        subscription.request(1);
+      }
+
+      @Override
+      public void onNext(SseEventResponseEntity<Model> responseEntity) {
+        if (!StringUtils.isEmpty(responseEntity.getEvent())) {
+          buffer.append(responseEntity.getEvent());
+        }
+        for (Model model : responseEntity.getData()) {
+          buffer.append(model.getName()).append(model.getAge());
+        }
+        subscription.request(1);
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        subscription.cancel();
+        countDownLatch.countDown();
+      }
+
+      @Override
+      public void onComplete() {
+        countDownLatch.countDown();
+      }
+    });
+    countDownLatch.await(10, TimeUnit.SECONDS);
+    TestMgr.check("test0jack0test1jack1test2jack2", buffer.toString());
+  }
+
+  private void testSseMultipleData(ReactiveStreamClient client) throws 
Exception {
+    Publisher<SseEventResponseEntity<Model>> result = client.sseMultipleData();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    StringBuilder buffer = new StringBuilder();
+    result.subscribe(new Subscriber<>() {
+      Subscription subscription;
+
+      @Override
+      public void onSubscribe(Subscription s) {
+        subscription = s;
+        subscription.request(1);
+      }
+
+      @Override
+      public void onNext(SseEventResponseEntity<Model> responseEntity) {
+        if (!StringUtils.isEmpty(responseEntity.getEvent())) {
+          buffer.append(responseEntity.getEvent());
+        }
+        for (Model model : responseEntity.getData()) {
+          buffer.append(model.getName()).append(model.getAge());
+        }
+        subscription.request(1);
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        subscription.cancel();
+        countDownLatch.countDown();
+      }
+
+      @Override
+      public void onComplete() {
+        countDownLatch.countDown();
+      }
+    });
+    countDownLatch.await(10, TimeUnit.SECONDS);
+    TestMgr.check("test0jack0tom0test1jack1tom1test2jack2tom2", 
buffer.toString());
+  }
+}
diff --git 
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java
 
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java
new file mode 100644
index 000000000..3bd1986d9
--- /dev/null
+++ 
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.springboot.springmvc.client;
+
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.apache.servicecomb.demo.model.Model;
+import org.apache.servicecomb.provider.pojo.Invoker;
+import org.reactivestreams.Publisher;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+
+@Configuration
+public class ThirdSvcConfiguration {
+  @RequestMapping(path = "/")
+  public interface ReactiveStreamClient {
+    @GetMapping("/sseString")
+    Publisher<String> sseString();
+
+    @GetMapping("/sseStringWithParam")
+    Publisher<String> sseStringWithParam(String param);
+
+    @GetMapping("/sseModel")
+    Publisher<Model> sseModel();
+
+    @GetMapping("/sseResponseEntity")
+    Publisher<SseEventResponseEntity<Model>> sseResponseEntity();
+
+    @GetMapping("/sseMultipleData")
+    Publisher<SseEventResponseEntity<Model>> sseMultipleData();
+  }
+
+  @Bean("reactiveStreamProvider")
+  public ReactiveStreamClient reactiveStreamProvider() {
+    return Invoker.createProxy("springmvc", "ReactiveStreamController", 
ReactiveStreamClient.class);
+  }
+}
diff --git 
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java
 
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java
new file mode 100644
index 000000000..323d3fbc3
--- /dev/null
+++ 
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.springboot.springmvc.server;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.apache.servicecomb.demo.model.Model;
+import org.apache.servicecomb.provider.rest.common.RestSchema;
+import org.reactivestreams.Publisher;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import io.reactivex.rxjava3.core.Flowable;
+
+@RestSchema(schemaId = "ReactiveStreamController")
+@RequestMapping(path = "/")
+public class ReactiveStreamController {
+  @GetMapping("/sseString")
+  public Publisher<String> sseString() {
+    return Flowable.fromArray("a", "b", "c");
+  }
+
+  @GetMapping("/sseStringWithParam")
+  public Publisher<String> sseStringWithParam(@RequestParam(name = "param") 
String param) {
+    return Flowable.fromArray("a", "b", "c", param);
+  };
+
+  @GetMapping("/sseModel")
+  public Publisher<Model> sseModel() {
+    return Flowable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
+        .map(item -> new Model("jack", item.intValue()));
+  }
+
+  @GetMapping("/sseResponseEntity")
+  public Publisher<SseEventResponseEntity<Model>> sseResponseEntity() {
+    AtomicInteger index = new AtomicInteger(0);
+    return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS)
+        .map(item -> new SseEventResponseEntity<Model>()
+            .event("test" + index)
+            .id(index.getAndIncrement())
+            .retry(System.currentTimeMillis())
+            .data(new Model("jack", item.intValue())));
+  }
+
+  @GetMapping("/sseMultipleData")
+  public Publisher<SseEventResponseEntity<Model>> sseMultipleData() {
+    AtomicInteger index = new AtomicInteger(0);
+    return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS)
+        .map(item -> new SseEventResponseEntity<Model>()
+            .event("test" + index)
+            .id(index.getAndIncrement())
+            .retry(System.currentTimeMillis())
+            .data(new Model("jack", item.intValue()))
+            .data(new Model("tom", item.intValue())));
+  }
+}
diff --git a/dependencies/default/pom.xml b/dependencies/default/pom.xml
index 3132ff8ac..efeb106ed 100644
--- a/dependencies/default/pom.xml
+++ b/dependencies/default/pom.xml
@@ -107,6 +107,8 @@
     <jakarta.xml.bind.version>4.0.2</jakarta.xml.bind.version>
     <jaxb-api.version>2.3.1</jaxb-api.version>
     <groovy.version>3.0.9</groovy.version>
+    <reactive-streams.version>1.0.4</reactive-streams.version>
+    <rxjava3.version>3.1.10</rxjava3.version>
     <!-- Base dir of main -->
     <main.basedir>${basedir}/../..</main.basedir>
   </properties>
@@ -397,6 +399,21 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.reactivestreams</groupId>
+        <artifactId>reactive-streams</artifactId>
+        <version>${reactive-streams.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.reactivex.rxjava3</groupId>
+        <artifactId>rxjava</artifactId>
+        <version>${rxjava3.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.vertx</groupId>
+        <artifactId>vertx-rx-java3</artifactId>
+        <version>${vertx.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>org.apache.maven</groupId>
diff --git a/foundations/foundation-vertx/pom.xml 
b/foundations/foundation-vertx/pom.xml
index f9bcd9a25..c0f80507b 100644
--- a/foundations/foundation-vertx/pom.xml
+++ b/foundations/foundation-vertx/pom.xml
@@ -43,6 +43,10 @@
       <groupId>org.apache.servicecomb</groupId>
       <artifactId>foundation-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.vertx</groupId>
+      <artifactId>vertx-rx-java3</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>io.vertx</groupId>
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
index 5d363e874..d8800800f 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
@@ -144,7 +144,7 @@ public class TcpClientConnection extends TcpConnection {
     if (Status.WORKING.equals(status)) {
       // encode in sender thread
       try (TcpOutputStream os = tcpClientPackage.createStream()) {
-        write(os.getByteBuf());
+        write(os.getBuffer());
         tcpClientPackage.finishWriteToBuffer();
       }
       return true;
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
index 2a0559bab..fc1fc17df 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
@@ -25,6 +25,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import io.vertx.core.buffer.Buffer;
 import jakarta.servlet.ServletOutputStream;
 import jakarta.servlet.http.Cookie;
 import jakarta.servlet.http.Part;
@@ -85,7 +86,7 @@ public abstract class AbstractHttpServletResponse extends 
BodyBufferSupportImpl
 
   @Override
   public void flushBuffer() throws IOException {
-    throw new Error("not supported method");
+    // for vert.x do noting
   }
 
   @Override
@@ -222,4 +223,14 @@ public abstract class AbstractHttpServletResponse extends 
BodyBufferSupportImpl
   public CompletableFuture<Void> sendPart(Part body) {
     throw new Error("not supported method");
   }
+
+  @Override
+  public CompletableFuture<Void> sendBuffer(Buffer buffer) {
+    throw new Error("not supported method");
+  }
+
+  @Override
+  public void endResponse() throws IOException {
+    throw new Error("not supported method");
+  }
 }
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
index 265ee658e..dfad8711a 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
@@ -17,12 +17,15 @@
 
 package org.apache.servicecomb.foundation.vertx.http;
 
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 import jakarta.servlet.http.HttpServletResponse;
 import jakarta.servlet.http.Part;
 import jakarta.ws.rs.core.Response.StatusType;
 
+import io.reactivex.rxjava3.core.Flowable;
+import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpHeaders;
 
 public interface HttpServletResponseEx extends HttpServletResponse, 
BodyBufferSupport {
@@ -37,4 +40,16 @@ public interface HttpServletResponseEx extends 
HttpServletResponse, BodyBufferSu
   default void setChunked(boolean chunked) {
     setHeader(HttpHeaders.TRANSFER_ENCODING.toString(), 
HttpHeaders.CHUNKED.toString());
   }
+
+  CompletableFuture<Void> sendBuffer(Buffer buffer);
+
+  default Flowable<Buffer> getFlowableBuffer() {
+    return null;
+  }
+
+  void endResponse() throws IOException;
+
+  default void setChunkedForEvent(boolean chunked) {
+    // not set header transfer-encoding=chunked in Rest Over Servlet, or will 
have Multiple in response.
+  }
 }
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
index fbe459745..a227560cd 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
@@ -83,7 +83,7 @@ public class StandardHttpServletResponseEx extends 
HttpServletResponseWrapper im
   }
 
   @Override
-  public void flushBuffer() throws IOException {
+  public void endResponse() throws IOException {
     byte[] bytes = getBodyBytes();
     if (bytes != null) {
       getOutputStream().write(bytes, 0, getBodyBytesLength());
@@ -122,4 +122,16 @@ public class StandardHttpServletResponseEx extends 
HttpServletResponseWrapper im
     Context context = Vertx.currentContext();
     return new PumpFromPart(context, part).toOutputStream(outputStream, false);
   }
+
+  @Override
+  public CompletableFuture<Void> sendBuffer(Buffer buffer) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    try {
+      getOutputStream().write(buffer.getBytes(), 0, buffer.length());
+      future.complete(null);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
 }
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
index da7e53217..0d8eaef9f 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
@@ -24,6 +24,7 @@ import jakarta.ws.rs.core.Response.StatusType;
 
 import org.apache.servicecomb.foundation.common.http.HttpStatus;
 
+import io.reactivex.rxjava3.core.Flowable;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpClientResponse;
 
@@ -32,11 +33,18 @@ public class VertxClientResponseToHttpServletResponse 
extends AbstractHttpServle
 
   private StatusType statusType;
 
+  private Flowable<Buffer> flowableBuffer;
+
   public VertxClientResponseToHttpServletResponse(HttpClientResponse 
clientResponse, Buffer bodyBuffer) {
     this.clientResponse = clientResponse;
     setBodyBuffer(bodyBuffer);
   }
 
+  public VertxClientResponseToHttpServletResponse(HttpClientResponse 
clientResponse, Flowable<Buffer> buffer) {
+    this.clientResponse = clientResponse;
+    this.flowableBuffer = buffer;
+  }
+
   @Override
   public int getStatus() {
     return clientResponse.statusCode();
@@ -69,4 +77,9 @@ public class VertxClientResponseToHttpServletResponse extends 
AbstractHttpServle
   public Collection<String> getHeaderNames() {
     return clientResponse.headers().names();
   }
+
+  @Override
+  public Flowable<Buffer> getFlowableBuffer() {
+    return flowableBuffer;
+  }
 }
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
index e0c518b29..5cb640f38 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.foundation.vertx.http;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import 
org.apache.servicecomb.foundation.vertx.stream.PumpFromPart;
 
 import io.vertx.core.Context;
 import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpServerResponse;
 
 public class VertxServerResponseToHttpServletResponse extends 
AbstractHttpServletResponse {
@@ -100,7 +102,7 @@ public class VertxServerResponseToHttpServletResponse 
extends AbstractHttpServle
   }
 
   @Override
-  public void flushBuffer() {
+  public void endResponse() {
     if (context == Vertx.currentContext()) {
       internalFlushBuffer();
       return;
@@ -134,4 +136,26 @@ public class VertxServerResponseToHttpServletResponse 
extends AbstractHttpServle
   public void setChunked(boolean chunked) {
     serverResponse.setChunked(chunked);
   }
+
+  @Override
+  public CompletableFuture<Void> sendBuffer(Buffer buffer) {
+    if (serverResponse.closed()) {
+      return CompletableFuture.failedFuture(new IOException("Response is 
closed before sending any data. "
+          + "Maybe client is timeout or check idle connection timeout for 
provider is properly configured."));
+    }
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    serverResponse.write(buffer).onComplete(result -> {
+      if (result.failed()) {
+        future.completeExceptionally(result.cause());
+      } else {
+        future.complete(null);
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public void setChunkedForEvent(boolean chunked) {
+    this.setChunked(chunked);
+  }
 }
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
index f063d5d06..42c9b3126 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
@@ -20,9 +20,7 @@ package org.apache.servicecomb.foundation.vertx.stream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 
-import io.netty.buffer.ByteBuf;
 import io.vertx.core.buffer.Buffer;
-import io.vertx.core.buffer.impl.VertxByteBufAllocator;
 
 /**
  * BufferOutputStream.
@@ -32,40 +30,36 @@ import io.vertx.core.buffer.impl.VertxByteBufAllocator;
 public class BufferOutputStream extends OutputStream {
   private static final int DIRECT_BUFFER_SIZE = 1024;
 
-  protected ByteBuf byteBuf;
+  protected Buffer byteBuf;
 
   public BufferOutputStream() {
-    this(VertxByteBufAllocator.DEFAULT.heapBuffer(DIRECT_BUFFER_SIZE, 
Integer.MAX_VALUE));
+    this(Buffer.buffer(DIRECT_BUFFER_SIZE));
   }
 
-  public BufferOutputStream(ByteBuf buffer) {
+  public BufferOutputStream(Buffer buffer) {
     this.byteBuf = buffer;
   }
 
-  public ByteBuf getByteBuf() {
-    return byteBuf;
-  }
-
   public Buffer getBuffer() {
-    return Buffer.buffer(byteBuf);
+    return byteBuf;
   }
 
   public int length() {
-    return byteBuf.readableBytes();
+    return byteBuf.length();
   }
 
   public void writeByte(byte value) {
-    byteBuf.writeByte(value);
+    byteBuf.appendByte(value);
   }
 
   // 实际是写byte
   @Override
   public void write(int byteValue) {
-    byteBuf.writeByte((byte) byteValue);
+    byteBuf.appendByte((byte) byteValue);
   }
 
   public void write(boolean value) {
-    byteBuf.writeBoolean(value);
+    byteBuf.appendByte(value ? (byte) 1 : (byte) 0);
   }
 
   public void writeInt(int pos, int value) {
@@ -73,20 +67,20 @@ public class BufferOutputStream extends OutputStream {
   }
 
   public void writeShort(short value) {
-    byteBuf.writeShort(value);
+    byteBuf.appendShort(value);
   }
 
   public void writeInt(int value) {
-    byteBuf.writeInt(value);
+    byteBuf.appendInt(value);
   }
 
   public void writeLong(long value) {
-    byteBuf.writeLong(value);
+    byteBuf.appendLong(value);
   }
 
   public void writeString(String value) {
-    byteBuf.writeInt(value.length());
-    byteBuf.writeCharSequence(value, StandardCharsets.UTF_8);
+    writeInt(value.length());
+    byteBuf.appendString(value, StandardCharsets.UTF_8.toString());
   }
 
   @Override
@@ -96,7 +90,7 @@ public class BufferOutputStream extends OutputStream {
 
   @Override
   public void write(byte[] bytes, int offset, int len) {
-    byteBuf.writeBytes(bytes, offset, len);
+    byteBuf.appendBytes(bytes, offset, len);
   }
 
   @Override
@@ -105,6 +99,6 @@ public class BufferOutputStream extends OutputStream {
   }
 
   public int writerIndex() {
-    return byteBuf.writerIndex();
+    return byteBuf.length();
   }
 }
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
index 95f460013..396cf1634 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
@@ -20,7 +20,6 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
 import io.vertx.core.Context;
@@ -49,7 +48,7 @@ public class TcpConnection {
   // so this optimization:
   // 1.avoid vertx's lock
   // 2.reduce netty's task schedule
-  private final Queue<ByteBuf> writeQueue = new ConcurrentLinkedQueue<>();
+  private final Queue<Buffer> writeQueue = new ConcurrentLinkedQueue<>();
 
   private final AtomicLong writeQueueSize = new AtomicLong();
 
@@ -83,7 +82,7 @@ public class TcpConnection {
     this.context = netSocket.getContext();
   }
 
-  public void write(ByteBuf buf) {
+  public void write(Buffer buf) {
     writeQueue.add(buf);
     long oldSize = writeQueueSize.getAndIncrement();
     if (oldSize == 0) {
@@ -99,13 +98,13 @@ public class TcpConnection {
   protected void writeInContext() {
     CompositeByteBuf cbb = ByteBufAllocator.DEFAULT.compositeBuffer();
     for (; ; ) {
-      ByteBuf buf = writeQueue.poll();
+      Buffer buf = writeQueue.poll();
       if (buf == null) {
         break;
       }
 
       writeQueueSize.decrementAndGet();
-      cbb.addComponent(true, buf);
+      cbb.addComponent(true, buf.getByteBuf());
 
       if (cbb.numComponents() == cbb.maxNumComponents()) {
         CompositeByteBuf last = cbb;
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
index 2c9fdc895..cd468a935 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
@@ -53,7 +53,7 @@ public class TestStream {
     Assertions.assertTrue((1 < oBufferOutputStream.length()));
 
     @SuppressWarnings("resource")
-    BufferInputStream oBufferInputStream = new 
BufferInputStream(oBufferOutputStream.getByteBuf());
+    BufferInputStream oBufferInputStream = new 
BufferInputStream(oBufferOutputStream.getBuffer().getByteBuf());
     Assertions.assertEquals("test", oBufferInputStream.readString());
     Assertions.assertEquals(1, oBufferInputStream.readByte());
     Assertions.assertEquals(true, oBufferInputStream.readBoolean());
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
index 0767898f2..a10a0c193 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
@@ -27,11 +27,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.vertx.core.AsyncResult;
 import io.vertx.core.Context;
 import io.vertx.core.Handler;
 import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.NetSocket;
 import io.vertx.core.net.impl.NetSocketImpl;
 import mockit.Deencapsulation;
@@ -90,14 +90,14 @@ public class TestTcpClientConnection {
     Deencapsulation.setField(tcpClientConnection, "status", Status.WORKING);
 
     long msgId = 1;
-    ByteBuf byteBuf = Unpooled.buffer();
+    Buffer byteBuf = Buffer.buffer();
     new Expectations(tcpClientConnection) {
       {
         tcpClientPackage.getMsgId();
         result = msgId;
         tcpClientPackage.createStream();
         result = tcpOutputStream;
-        tcpOutputStream.getByteBuf();
+        tcpOutputStream.getBuffer();
         result = byteBuf;
       }
     };
@@ -172,7 +172,7 @@ public class TestTcpClientConnection {
       {
         tcpClientPackage.getMsgId();
         result = msgId;
-        tcpClientConnection.write((ByteBuf) any);
+        tcpClientConnection.write((Buffer) any);
       }
     };
     new MockUp<Context>(context) {
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
index e6083edd0..053cc56e7 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
@@ -91,8 +91,7 @@ public class TestAbstractHttpServletResponse {
 
   @Test
   public void testFlushBuffer() {
-    Error error = Assertions.assertThrows(Error.class, () -> 
response.flushBuffer());
-    checkError(error);
+    Assertions.assertDoesNotThrow(() -> response.flushBuffer());
   }
 
   @Test
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
index c672fb288..5c34814f1 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
@@ -74,7 +74,7 @@ public class TestStandardHttpServletResponseEx {
   }
 
   @Test
-  public void flushBuffer() throws IOException {
+  public void endResponse() throws IOException {
     Buffer buffer = Buffer.buffer();
     ServletOutputStream output = new ServletOutputStream() {
       @Override
@@ -97,12 +97,12 @@ public class TestStandardHttpServletResponseEx {
     responseEx = new StandardHttpServletResponseEx(response);
 
     // no body
-    responseEx.flushBuffer();
+    responseEx.endResponse();
     Assertions.assertEquals(0, buffer.length());
 
     Buffer body = Buffer.buffer().appendString("body");
     responseEx.setBodyBuffer(body);
-    responseEx.flushBuffer();
+    responseEx.endResponse();
     Assertions.assertEquals("body", buffer.toString());
   }
 
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
index e438da8b0..5778091ac 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
@@ -253,21 +253,21 @@ public class TestVertxServerResponseToHttpServletResponse 
{
   }
 
   @Test
-  public void flushBuffer_sameContext() throws IOException {
-    response.flushBuffer();
+  public void endResponse_sameContext() throws IOException {
+    response.endResponse();
 
     Assertions.assertFalse(runOnContextInvoked);
   }
 
   @Test
-  public void flushBuffer_diffContext() throws IOException {
+  public void endResponse_diffContext() throws IOException {
     new Expectations() {
       {
         Vertx.currentContext();
         result = null;
       }
     };
-    response.flushBuffer();
+    response.endResponse();
 
     Assertions.assertTrue(runOnContextInvoked);
   }
diff --git a/swagger/swagger-generator/generator-core/pom.xml 
b/swagger/swagger-generator/generator-core/pom.xml
index c0aa02036..4a5825800 100644
--- a/swagger/swagger-generator/generator-core/pom.xml
+++ b/swagger/swagger-generator/generator-core/pom.xml
@@ -51,6 +51,10 @@
       <groupId>com.google.inject</groupId>
       <artifactId>guice</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.reactivestreams</groupId>
+      <artifactId>reactive-streams</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.junit.jupiter</groupId>
diff --git 
a/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java
 
b/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java
new file mode 100644
index 000000000..2f59fcb64
--- /dev/null
+++ 
b/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.swagger.generator.core.processor.response;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+import org.apache.servicecomb.swagger.generator.OperationGenerator;
+import org.apache.servicecomb.swagger.generator.SwaggerGenerator;
+import org.apache.servicecomb.swagger.jakarta.ModelConvertersAdapterJakarta;
+import org.reactivestreams.Publisher;
+
+import io.swagger.models.Model;
+import jakarta.ws.rs.core.MediaType;
+
+public class PublisherProcessor extends DefaultResponseTypeProcessor {
+  protected static final List<String> EVENTS_PRODUCE = 
List.of(MediaType.SERVER_SENT_EVENTS);
+
+  public PublisherProcessor() {
+    extractActualType = true;
+  }
+
+  @Override
+  public Type getProcessType() {
+    return Publisher.class;
+  }
+
+  @Override
+  public Model process(SwaggerGenerator swaggerGenerator, OperationGenerator 
operationGenerator,
+      Type genericResponseType) {
+    
ModelConvertersAdapterJakarta.getInstance().addClassToSkip(Publisher.class.getName());
+    operationGenerator.getOperation().produces(EVENTS_PRODUCE);
+    return super.process(swaggerGenerator, operationGenerator, 
genericResponseType);
+  }
+}
diff --git 
a/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
 
b/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
index 53678b702..9abf37e8c 100644
--- 
a/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
+++ 
b/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
@@ -17,3 +17,4 @@
 
 
org.apache.servicecomb.swagger.generator.core.processor.response.CompletableFutureProcessor
 
org.apache.servicecomb.swagger.generator.core.processor.response.OptionalProcessor
+org.apache.servicecomb.swagger.generator.core.processor.response.PublisherProcessor
diff --git a/swagger/swagger-invocation/invocation-core/pom.xml 
b/swagger/swagger-invocation/invocation-core/pom.xml
index 1680a0044..3bb762136 100644
--- a/swagger/swagger-invocation/invocation-core/pom.xml
+++ b/swagger/swagger-invocation/invocation-core/pom.xml
@@ -31,6 +31,14 @@
       <groupId>org.apache.servicecomb</groupId>
       <artifactId>swagger-generator-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.vertx</groupId>
+      <artifactId>vertx-rx-java3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.vertx</groupId>
+      <artifactId>vertx-core</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.servicecomb</groupId>
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java
similarity index 51%
copy from 
foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
copy to 
swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java
index 265ee658e..636af6558 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java
@@ -14,27 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.servicecomb.swagger.invocation.response.consumer;
 
-package org.apache.servicecomb.foundation.vertx.http;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
 
-import java.util.concurrent.CompletableFuture;
+import io.reactivex.rxjava3.core.Flowable;
 
-import jakarta.servlet.http.HttpServletResponse;
-import jakarta.servlet.http.Part;
-import jakarta.ws.rs.core.Response.StatusType;
+public class PublisherConsumerResponseMapper implements ConsumerResponseMapper 
{
+  private final boolean shouldExtractEntity;
 
-import io.vertx.core.http.HttpHeaders;
-
-public interface HttpServletResponseEx extends HttpServletResponse, 
BodyBufferSupport {
-  StatusType getStatusType();
-
-  void setAttribute(String key, Object value);
-
-  Object getAttribute(String key);
-
-  CompletableFuture<Void> sendPart(Part body);
+  public PublisherConsumerResponseMapper(boolean shouldExtractEntity) {
+    this.shouldExtractEntity = shouldExtractEntity;
+  }
 
-  default void setChunked(boolean chunked) {
-    setHeader(HttpHeaders.TRANSFER_ENCODING.toString(), 
HttpHeaders.CHUNKED.toString());
+  @Override
+  public Object mapResponse(Response response) {
+    Flowable<SseEventResponseEntity<?>> flowable = response.getResult();
+    if (shouldExtractEntity) {
+      return flowable.concatMap(entity -> 
Flowable.fromIterable(entity.getData()));
+    }
+    return flowable;
   }
 }
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java
new file mode 100644
index 000000000..e7dd6d63c
--- /dev/null
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.swagger.invocation.response.consumer;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import 
org.apache.servicecomb.swagger.invocation.response.ResponseMapperFactorys;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
+
+public class PublisherConsumerResponseMapperFactory implements 
ConsumerResponseMapperFactory {
+  @Override
+  public boolean isMatch(Type consumerType) {
+    if (!ParameterizedType.class.isAssignableFrom(consumerType.getClass())) {
+      return false;
+    }
+
+    return ((ParameterizedType) 
consumerType).getRawType().equals(Publisher.class);
+  }
+
+  @Override
+  public ConsumerResponseMapper 
createResponseMapper(ResponseMapperFactorys<ConsumerResponseMapper> factorys,
+      Type consumerType) {
+    Type realConsumerType = ((ParameterizedType) 
consumerType).getActualTypeArguments()[0];
+    return new PublisherConsumerResponseMapper(
+        
!realConsumerType.getTypeName().contains(SseEventResponseEntity.class.getName()));
+  }
+}
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
new file mode 100644
index 000000000..c5f56bb53
--- /dev/null
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.swagger.invocation.response.producer;
+
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+
+import io.reactivex.rxjava3.core.Flowable;
+import jakarta.ws.rs.core.Response.StatusType;
+
+public class PublisherProducerResponseMapper implements ProducerResponseMapper 
{
+  private final boolean shouldConstructEntity;
+
+  public PublisherProducerResponseMapper(boolean shouldConstructEntity) {
+    this.shouldConstructEntity = shouldConstructEntity;
+  }
+
+  @Override
+  public Response mapResponse(StatusType status, Object result) {
+    if (shouldConstructEntity) {
+      Flowable<SseEventResponseEntity<?>> responseEntity = ((Flowable<?>) 
result).map(obj ->
+          new SseEventResponseEntity<>()
+              .data(obj));
+      return Response.create(status, responseEntity);
+    }
+    return Response.create(status, result);
+  }
+}
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java
new file mode 100644
index 000000000..9b35751f4
--- /dev/null
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.swagger.invocation.response.producer;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import 
org.apache.servicecomb.swagger.invocation.response.ResponseMapperFactorys;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
+
+public class PublisherProducerResponseMapperFactory implements 
ProducerResponseMapperFactory {
+  @Override
+  public boolean isMatch(Type producerType) {
+    if (!ParameterizedType.class.isAssignableFrom(producerType.getClass())) {
+      return false;
+    }
+
+    return ((ParameterizedType) 
producerType).getRawType().equals(Publisher.class);
+  }
+
+  @Override
+  public ProducerResponseMapper 
createResponseMapper(ResponseMapperFactorys<ProducerResponseMapper> factorys,
+      Type providerType) {
+    Type realProducerType = ((ParameterizedType) 
providerType).getActualTypeArguments()[0];
+    return new PublisherProducerResponseMapper(
+        
!realProducerType.getTypeName().contains(SseEventResponseEntity.class.getName()));
+  }
+}
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java
new file mode 100644
index 000000000..923949de6
--- /dev/null
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.swagger.invocation.sse;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import io.swagger.annotations.ApiModelProperty;
+
+public class SseEventResponseEntity<T> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SseEventResponseEntity.class);
+
+  /**
+   * event id
+   */
+  private Integer id;
+
+  /**
+   * event type
+   */
+  private String event;
+
+  /**
+   * reconnection time
+   */
+  private Long retry;
+
+  /**
+   * business data
+   */
+  private List<T> datas = new ArrayList<>();
+
+  public SseEventResponseEntity<T> id(int id) {
+    if (this.id != null) {
+      LOGGER.warn("origin id: [{}] is exists, overridden by the current value: 
[{}]", this.id, id);
+    }
+    this.id = id;
+    return this;
+  }
+
+  public SseEventResponseEntity<T> event(String event) {
+    if (!StringUtils.isEmpty(this.event)) {
+      LOGGER.warn("origin event: [{}] is exists, overridden by the current 
value: [{}]", this.event, event);
+    }
+    this.event = event;
+    return this;
+  }
+
+  public SseEventResponseEntity<T> retry(long retry) {
+    if (this.retry != null) {
+      LOGGER.warn("origin retry: [{}] is exists, overridden by the current 
value: [{}]", this.retry, retry);
+    }
+    this.retry = retry;
+    return this;
+  }
+
+  public SseEventResponseEntity<T> data(T data) {
+    if (data == null) {
+      LOGGER.warn("The data content cannot be null!");
+    } else {
+      datas.add(data);
+    }
+    return this;
+  }
+
+  public Integer getId() {
+    return id;
+  }
+
+  public String getEvent() {
+    return event;
+  }
+
+  public Long getRetry() {
+    return retry;
+  }
+
+  public List<T> getData() {
+    return datas;
+  }
+
+  @JsonIgnore
+  @ApiModelProperty(hidden = true)
+  public boolean isEmpty() {
+    return id == null
+        && event == null
+        && retry == null
+        && datas.isEmpty();
+  }
+}
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java
similarity index 55%
copy from 
foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
copy to 
swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java
index 265ee658e..a72161cf2 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java
@@ -14,27 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.servicecomb.swagger.invocation.sse;
 
-package org.apache.servicecomb.foundation.vertx.http;
+import 
org.apache.servicecomb.swagger.generator.core.processor.response.DefaultResponseTypeProcessor;
 
-import java.util.concurrent.CompletableFuture;
-
-import jakarta.servlet.http.HttpServletResponse;
-import jakarta.servlet.http.Part;
-import jakarta.ws.rs.core.Response.StatusType;
-
-import io.vertx.core.http.HttpHeaders;
-
-public interface HttpServletResponseEx extends HttpServletResponse, 
BodyBufferSupport {
-  StatusType getStatusType();
-
-  void setAttribute(String key, Object value);
-
-  Object getAttribute(String key);
-
-  CompletableFuture<Void> sendPart(Part body);
+public class SseEventResponseEntityProcessor extends 
DefaultResponseTypeProcessor {
+  public SseEventResponseEntityProcessor() {
+    extractActualType = true;
+  }
 
-  default void setChunked(boolean chunked) {
-    setHeader(HttpHeaders.TRANSFER_ENCODING.toString(), 
HttpHeaders.CHUNKED.toString());
+  @Override
+  public Class<?> getProcessType() {
+    return SseEventResponseEntity.class;
   }
 }
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
 
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
index bb2abfe50..965674ee0 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
@@ -17,3 +17,4 @@
 
 org.apache.servicecomb.swagger.invocation.generator.ScbResponseProcessor
 org.apache.servicecomb.swagger.invocation.ws.ServerWebSocketResponseProcessor
+org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntityProcessor
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
 
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
index 0474058a9..5092ed6b3 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
@@ -19,3 +19,4 @@ 
org.apache.servicecomb.swagger.invocation.response.consumer.CseResponseConsumerR
 
org.apache.servicecomb.swagger.invocation.response.consumer.CompletableFutureConsumerResponseMapperFactory
 
org.apache.servicecomb.swagger.invocation.response.consumer.DefaultConsumerResponseMapperFactory
 
org.apache.servicecomb.swagger.invocation.response.consumer.OptionalConsumerResponseMapperFactory
+org.apache.servicecomb.swagger.invocation.response.consumer.PublisherConsumerResponseMapperFactory
\ No newline at end of file
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
 
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
index dac2a90e3..bf675cf96 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
@@ -19,3 +19,4 @@ 
org.apache.servicecomb.swagger.invocation.response.producer.CseResponseProducerR
 
org.apache.servicecomb.swagger.invocation.response.producer.CompletableFutureProducerResponseMapperFactory
 
org.apache.servicecomb.swagger.invocation.response.producer.DefaultProducerResponseMapperFactory
 
org.apache.servicecomb.swagger.invocation.response.producer.OptionalProducerResponseMapperFactory
+org.apache.servicecomb.swagger.invocation.response.producer.PublisherProducerResponseMapperFactory
\ No newline at end of file
diff --git 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
index 36e0af598..f973f9e1c 100644
--- 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
+++ 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
@@ -58,6 +58,6 @@ public class HighwayProducerInvocationFlow extends 
ProducerInvocationFlow {
   @Override
   protected void sendResponse(Invocation invocation, Response response) {
     HighwayTransportContext transportContext = 
invocation.getTransportContext();
-    connection.write(transportContext.getResponseBuffer().getByteBuf());
+    connection.write(transportContext.getResponseBuffer());
   }
 }
diff --git 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 98fab6362..0c9e8e79b 100644
--- 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -153,7 +153,7 @@ public class HighwayServerInvoke {
         respBuffer = HighwayCodec.encodeResponse(msgId, header, bodySchema, 
response.getResult());
       }
       invocation.getInvocationStageTrace().finishServerFiltersResponse();
-      connection.write(respBuffer.getByteBuf());
+      connection.write(respBuffer);
     } catch (Exception e) {
       // keep highway performance and simple, this encoding/decoding error not 
need handle by client
       String msg = String.format("encode response failed, %s, msgId=%d",
diff --git a/transports/transport-rest/transport-rest-client/pom.xml 
b/transports/transport-rest/transport-rest-client/pom.xml
index 0a22aaf8c..c2265042a 100644
--- a/transports/transport-rest/transport-rest-client/pom.xml
+++ b/transports/transport-rest/transport-rest-client/pom.xml
@@ -40,6 +40,10 @@
       <groupId>org.apache.servicecomb</groupId>
       <artifactId>common-rest</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.vertx</groupId>
+      <artifactId>vertx-rx-java3</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>io.vertx</groupId>
diff --git 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
index af5ee1d10..7e37ebc9a 100644
--- 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
+++ 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
@@ -18,11 +18,14 @@
 package org.apache.servicecomb.transport.rest.client.http;
 
 import java.util.Collection;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 import jakarta.ws.rs.core.HttpHeaders;
+import jakarta.ws.rs.core.MediaType;
 
 import org.apache.servicecomb.common.rest.RestConst;
+import 
org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor;
 import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
 import 
org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
 import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
@@ -35,12 +38,17 @@ import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.JavaType;
 import com.netflix.config.DynamicPropertyFactory;
 
+import io.reactivex.rxjava3.core.Flowable;
+import io.vertx.core.buffer.Buffer;
+
 public class DefaultHttpClientFilter implements HttpClientFilter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultHttpClientFilter.class);
 
@@ -76,6 +84,7 @@ public class DefaultHttpClientFilter implements 
HttpClientFilter {
     if (idx != -1) {
       contentTypeForFind = contentType.substring(0, idx);
     }
+
     return restOperation.findProduceProcessor(contentTypeForFind);
   }
 
@@ -84,12 +93,11 @@ public class DefaultHttpClientFilter implements 
HttpClientFilter {
     if (result != null) {
       return Response.create(responseEx.getStatusType(), result);
     }
-
     OperationMeta operationMeta = invocation.getOperationMeta();
     JavaType responseType = 
invocation.findResponseType(responseEx.getStatus());
     RestOperationMeta swaggerRestOperation = 
operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION);
     ProduceProcessor produceProcessor = 
findProduceProcessor(swaggerRestOperation, responseEx);
-    if (produceProcessor == null) {
+    if (produceProcessor == null && !isEventStream(responseEx)) {
       // This happens outside the runtime such as Servlet filter response. 
Here we give a default json parser to it
       // and keep user data not get lose.
       LOGGER.warn("Response content-type {} is not supported. Method {}, path 
{}, statusCode {}, reasonPhrase {}.",
@@ -103,7 +111,16 @@ public class DefaultHttpClientFilter implements 
HttpClientFilter {
     }
 
     try {
-      result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(), 
responseType);
+      if (responseEx.getFlowableBuffer() == null) {
+        result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(), 
responseType);
+      } else {
+        Flowable<Buffer> flowable = responseEx.getFlowableBuffer();
+        ProduceEventStreamProcessor finalProduceProcessor = new 
ProduceEventStreamProcessor();
+        result = flowable.concatMap(buffer -> 
extractFlowableBody(finalProduceProcessor, responseType, buffer))
+            .doFinally(finalProduceProcessor::close)
+            .doOnCancel(finalProduceProcessor::close)
+            .filter(Objects::nonNull);
+      }
       Response response = Response.create(responseEx.getStatusType(), result);
       if (response.isFailed()) {
         LOGGER.warn("invoke operation [{}] failed, status={}, msg={}", 
invocation.getMicroserviceQualifiedName(),
@@ -130,6 +147,16 @@ public class DefaultHttpClientFilter implements 
HttpClientFilter {
     }
   }
 
+  private boolean isEventStream(HttpServletResponseEx responseEx) {
+    return responseEx.getHeader(HttpHeaders.CONTENT_TYPE) != null
+        && 
responseEx.getHeader(HttpHeaders.CONTENT_TYPE).contains(MediaType.SERVER_SENT_EVENTS);
+  }
+
+  protected Publisher<SseEventResponseEntity<?>> 
extractFlowableBody(ProduceEventStreamProcessor produceProcessor,
+      JavaType responseType, Buffer buffer) throws Exception {
+    return produceProcessor.decodeResponse(buffer, responseType);
+  }
+
   @Override
   public Response afterReceiveResponse(Invocation invocation, 
HttpServletResponseEx responseEx) {
     Response response = extractResponse(invocation, responseEx);
diff --git 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index f749f1173..1cb7c20e7 100644
--- 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++ 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 
+import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.core.Response.Status;
 
 import org.apache.commons.lang3.StringUtils;
@@ -53,6 +54,7 @@ import 
org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.reactivex.rxjava3.core.Flowable;
 import io.vertx.core.Future;
 import io.vertx.core.Handler;
 import io.vertx.core.buffer.Buffer;
@@ -60,6 +62,7 @@ import io.vertx.core.http.HttpClientRequest;
 import io.vertx.core.http.HttpClientResponse;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.RequestOptions;
+import io.vertx.rxjava3.FlowableHelper;
 
 public class RestClientInvocation {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RestClientInvocation.class);
@@ -87,6 +90,8 @@ public class RestClientInvocation {
 
   private boolean alreadyFailed = false;
 
+  protected static final String EVENTS_MEDIA_TYPE = 
MediaType.SERVER_SENT_EVENTS;
+
   public RestClientInvocation(HttpClientWithContext httpClientWithContext, 
List<HttpClientFilter> httpClientFilters) {
     this.httpClientWithContext = httpClientWithContext;
     this.httpClientFilters = httpClientFilters;
@@ -218,6 +223,11 @@ public class RestClientInvocation {
       return;
     }
 
+    if (isServerSendEvents(httpClientResponse)) {
+      
processFlowableResponseBody(FlowableHelper.toFlowable(httpClientResponse));
+      return;
+    }
+
     httpClientResponse.exceptionHandler(e -> {
       invocation.getTraceIdLogger().error(LOGGER, "Failed to receive response, 
local:{}, remote:{}, message={}.",
           getLocalAddress(), httpClientResponse.netSocket().remoteAddress(),
@@ -228,17 +238,38 @@ public class RestClientInvocation {
     clientResponse.bodyHandler(this::processResponseBody);
   }
 
+  private boolean isServerSendEvents(HttpClientResponse httpClientResponse) {
+    if (httpClientResponse.getHeader("Content-Type") == null) {
+      return false;
+    }
+    return 
httpClientResponse.getHeader("Content-Type").contains(EVENTS_MEDIA_TYPE);
+  }
+
   /**
    * after this method, connection will be recycled to connection pool
    * @param responseBuf response body buffer, when download, responseBuf is 
null, because download data by ReadStreamPart
    */
   protected void processResponseBody(Buffer responseBuf) {
+    HttpServletResponseEx responseEx =
+        new VertxClientResponseToHttpServletResponse(clientResponse, 
responseBuf);
+    doProcessResponseBody(responseEx);
+  }
+
+  /**
+   * after this method, connection will be recycled to connection pool
+   * @param flowable sse flowable response
+   */
+  protected void processFlowableResponseBody(Flowable<Buffer> flowable) {
+    HttpServletResponseEx responseEx =
+        new VertxClientResponseToHttpServletResponse(clientResponse, flowable);
+    doProcessResponseBody(responseEx);
+  }
+
+  private void doProcessResponseBody(HttpServletResponseEx responseEx) {
     invocation.getInvocationStageTrace().finishReceiveResponse();
     invocation.getResponseExecutor().execute(() -> {
       try {
         invocation.getInvocationStageTrace().startClientFiltersResponse();
-        HttpServletResponseEx responseEx =
-            new VertxClientResponseToHttpServletResponse(clientResponse, 
responseBuf);
         for (HttpClientFilter filter : httpClientFilters) {
           if (filter.enabled()) {
             Response response = filter.afterReceiveResponse(invocation, 
responseEx);
diff --git 
a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
 
b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
index 5c0b891e4..2b5192a54 100644
--- 
a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
+++ 
b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
@@ -141,6 +141,8 @@ public class TestDefaultHttpClientFilter {
         result = 400;
         responseEx.getBodyBuffer();
         result = new BufferImpl().appendString("abc");
+        responseEx.getFlowableBuffer();
+        result = null;
       }
     };
     new MockUp<DefaultHttpClientFilter>() {
@@ -189,6 +191,8 @@ public class TestDefaultHttpClientFilter {
         result = 200;
         responseEx.getBodyBuffer();
         result = new BufferImpl().appendString("abc");
+        responseEx.getFlowableBuffer();
+        result = null;
       }
     };
     new MockUp<DefaultHttpClientFilter>() {
@@ -237,6 +241,8 @@ public class TestDefaultHttpClientFilter {
         result = Status.FORBIDDEN;
         responseEx.getBodyBuffer();
         result = Buffer.buffer(JsonUtils.writeValueAsString(data).getBytes());
+        responseEx.getFlowableBuffer();
+        result = null;
       }
     };
 
@@ -284,6 +290,8 @@ public class TestDefaultHttpClientFilter {
 
         responseEx.getStatusType();
         result = Status.OK;
+        responseEx.getFlowableBuffer();
+        result = null;
       }
     };
 

Reply via email to