This is an automated email from the ASF dual-hosted git repository.
youling1128 pushed a commit to branch 2.9.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/2.9.x by this push:
new cacfb7924 [#4873] Support for SSE interface RPC calls (#4875)
cacfb7924 is described below
commit cacfb79249536f152d341e174c191472f2eb5c4b
Author: Alex <[email protected]>
AuthorDate: Wed Aug 20 20:35:55 2025 +0800
[#4873] Support for SSE interface RPC calls (#4875)
---
.github/workflows/maven.yml | 2 +-
.../common/rest/AbstractRestInvocation.java | 2 +-
.../common/rest/RestProducerInvocationFlow.java | 10 +-
.../codec/produce/ProduceEventStreamProcessor.java | 412 +++++++++++++++++++++
.../common/rest/definition/RestOperationMeta.java | 8 +
.../rest/filter/inner/RestServerCodecFilter.java | 4 +-
.../rest/filter/inner/ServerRestArgsFilter.java | 92 ++++-
...comb.common.rest.codec.produce.ProduceProcessor | 3 +-
.../common/rest/TestAbstractRestInvocation.java | 4 +-
.../produce/TestProduceEventStreamProcessor.java | 315 ++++++++++++++++
demo/demo-cse-v2/gateway/pom.xml | 14 +
demo/demo-cse-v2/provider/pom.xml | 4 +
.../org/apache/servicecomb/demo/model/Model.java | 37 +-
.../springmvc/client/ReactiveStreamIT.java | 205 ++++++++++
.../springmvc/client/ThirdSvcConfiguration.java | 53 +++
.../springmvc/server/ReactiveStreamController.java | 73 ++++
dependencies/default/pom.xml | 17 +
foundations/foundation-vertx/pom.xml | 4 +
.../vertx/client/tcp/TcpClientConnection.java | 2 +-
.../vertx/http/AbstractHttpServletResponse.java | 13 +-
.../vertx/http/HttpServletResponseEx.java | 15 +
.../vertx/http/StandardHttpServletResponseEx.java | 14 +-
.../VertxClientResponseToHttpServletResponse.java | 13 +
.../VertxServerResponseToHttpServletResponse.java | 26 +-
.../vertx/stream/BufferOutputStream.java | 36 +-
.../foundation/vertx/tcp/TcpConnection.java | 9 +-
.../servicecomb/foundation/vertx/TestStream.java | 2 +-
.../vertx/client/tcp/TestTcpClientConnection.java | 8 +-
.../http/TestAbstractHttpServletResponse.java | 3 +-
.../http/TestStandardHttpServletResponseEx.java | 6 +-
...stVertxServerResponseToHttpServletResponse.java | 8 +-
swagger/swagger-generator/generator-core/pom.xml | 4 +
.../processor/response/PublisherProcessor.java | 50 +++
...icecomb.swagger.generator.ResponseTypeProcessor | 1 +
swagger/swagger-invocation/invocation-core/pom.xml | 8 +
.../consumer/PublisherConsumerResponseMapper.java | 33 +-
.../PublisherConsumerResponseMapperFactory.java | 43 +++
.../producer/PublisherProducerResponseMapper.java | 42 +++
.../PublisherProducerResponseMapperFactory.java | 43 +++
.../invocation/sse/SseEventResponseEntity.java | 111 ++++++
.../sse/SseEventResponseEntityProcessor.java | 28 +-
...icecomb.swagger.generator.ResponseTypeProcessor | 1 +
...response.consumer.ConsumerResponseMapperFactory | 1 +
...response.producer.ProducerResponseMapperFactory | 1 +
.../highway/HighwayProducerInvocationFlow.java | 2 +-
.../transport/highway/HighwayServerInvoke.java | 2 +-
.../transport-rest/transport-rest-client/pom.xml | 4 +
.../rest/client/http/DefaultHttpClientFilter.java | 33 +-
.../rest/client/http/RestClientInvocation.java | 35 +-
.../client/http/TestDefaultHttpClientFilter.java | 8 +
50 files changed, 1749 insertions(+), 115 deletions(-)
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 3d5f372d7..5788af888 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -36,7 +36,7 @@ jobs:
- name: Set up jdk
uses: actions/setup-java@v3
with:
- java-version: '21'
+ java-version: '17'
distribution: 'temurin'
- name: Set up Maven
uses: stCarolas/[email protected]
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 1f3d18766..ede3e94d4 100644
---
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -351,7 +351,7 @@ public abstract class AbstractRestInvocation {
if (!(response.getResult() instanceof ServerWebSocket)) {
try {
- responseEx.flushBuffer();
+ responseEx.endResponse();
} catch (Throwable flushException) {
LOGGER.error("Failed to flush rest response, operation:{}, request
uri:{}",
getMicroserviceQualifiedName(), requestEx.getRequestURI(),
flushException);
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
index 70c3a5a3a..673e45ee2 100644
---
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
@@ -53,7 +53,7 @@ public class RestProducerInvocationFlow extends
ProducerInvocationFlow {
requestEx.getRequestURI(), e);
}
- flushResponse("UNKNOWN_OPERATION");
+ endResponse("UNKNOWN_OPERATION");
return null;
}
@@ -61,16 +61,16 @@ public class RestProducerInvocationFlow extends
ProducerInvocationFlow {
protected void sendResponse(Invocation invocation, Response response) {
if (isDownloadFileResponseType(invocation, response)) {
responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()))
- .whenComplete((r, e) ->
flushResponse(invocation.getMicroserviceQualifiedName()));
+ .whenComplete((r, e) ->
endResponse(invocation.getMicroserviceQualifiedName()));
return;
}
- flushResponse(invocation.getMicroserviceQualifiedName());
+ endResponse(invocation.getMicroserviceQualifiedName());
}
- private void flushResponse(String operationName) {
+ private void endResponse(String operationName) {
try {
- responseEx.flushBuffer();
+ responseEx.endResponse();
} catch (Throwable flushException) {
LOGGER.error("Failed to flush rest response, operation:{}, request
uri:{}",
operationName, requestEx.getRequestURI(), flushException);
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
new file mode 100644
index 000000000..109a460cd
--- /dev/null
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.common.rest.codec.produce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
+import org.apache.servicecomb.foundation.vertx.stream.BufferInputStream;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.reactivex.rxjava3.core.Flowable;
+import io.vertx.core.buffer.Buffer;
+import jakarta.ws.rs.core.MediaType;
+
+public class ProduceEventStreamProcessor implements ProduceProcessor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProduceEventStreamProcessor.class);
+
+ private static final String CR_STR = "\r";
+
+ private static final byte[] CR = CR_STR.getBytes(StandardCharsets.UTF_8);
+
+ private static final String LF_STR = "\n";
+
+ private static final byte[] LF = LF_STR.getBytes(StandardCharsets.UTF_8);
+
+ private static final String CRLF_STR = "\r\n";
+
+ private static final byte[] CRLF = CRLF_STR.getBytes(StandardCharsets.UTF_8);
+
+ private String lineDelimiter;
+
+ private byte[] lineDelimiterBytes;
+
+ private int writeIndex = 0;
+
+ @Override
+ public String getName() {
+ return MediaType.SERVER_SENT_EVENTS;
+ }
+
+ @Override
+ public int getOrder() {
+ return 0;
+ }
+
+ @Override
+ public void doEncodeResponse(OutputStream output, Object result) throws
Exception {
+ StringBuilder eventBuilder = new StringBuilder();
+ if (result instanceof SseEventResponseEntity<?> responseEntity) {
+ appendId(eventBuilder, responseEntity.getId());
+ appendEvent(eventBuilder, responseEntity.getEvent());
+ appendRetry(eventBuilder, responseEntity.getRetry());
+ appendData(eventBuilder, responseEntity.getData());
+ eventBuilder.append("\n");
+ output.write(eventBuilder.toString().getBytes(StandardCharsets.UTF_8));
+ } else {
+ LOGGER.warn("Does not support encoding objects other than
SseEventResponseEntity!");
+ }
+ }
+
+ private enum ProcessStatus {
+ DETERMINE_LINE_DELIMITER,
+ MATCHING_CR,
+ MATCHING_LF,
+ MATCHING_CRLF,
+ MATCHING_LINE,
+ END_OF_MESSAGE,
+ /**
+ * The whole SSE stream is closed.
+ * Be careful: there may be remaining buffer should be processed.
+ */
+ END_OF_STREAM
+ }
+
+ private ProcessStatus loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
+
+ private int matchingDelimiterIndex = 0;
+
+ final ByteBuf buffer = Unpooled.buffer();
+
+ private SseEventResponseEntity<?> currentEntity = new
SseEventResponseEntity<>();
+
+ private List<SseEventResponseEntity<?>> entityList = new ArrayList<>();
+
+ private JavaType type;
+
+ @Override
+ public List<SseEventResponseEntity<?>> doDecodeResponse(InputStream input,
JavaType type) throws Exception {
+ this.type = type;
+ final byte[] readCache = new byte[Math.min(128, input.available())];
+ int bytesRead;
+ while ((bytesRead = input.read(readCache)) > 0) {
+ processAllBytes(readCache, bytesRead);
+ }
+ final List<SseEventResponseEntity<?>> resultList = entityList;
+ entityList = new ArrayList<>();
+ return resultList;
+ }
+
+ private void processAllBytes(byte[] readCache, int cacheEndPos) {
+ int lastProcessedPosition = innerLoop(readCache, 0, cacheEndPos);
+ while (lastProcessedPosition < cacheEndPos) {
+ lastProcessedPosition = innerLoop(readCache, lastProcessedPosition,
cacheEndPos);
+ }
+ }
+
+ private int innerLoop(final byte[] readCache, final int startPos, final int
cacheEndPos) {
+ if (startPos >= cacheEndPos) {
+ return cacheEndPos;
+ }
+ switch (loopStatus) {
+ case MATCHING_CR -> {
+ return tryToMatchDelimiterCR(readCache, startPos, cacheEndPos);
+ }
+ case MATCHING_CRLF -> {
+ return tryToMatchDelimiterCRLF(readCache, startPos, cacheEndPos);
+ }
+ case MATCHING_LF -> {
+ return tryToMatchDelimiterLF(readCache, startPos, cacheEndPos);
+ }
+ case DETERMINE_LINE_DELIMITER -> {
+ return searchFirstLineDelimiter(readCache, startPos, cacheEndPos);
+ }
+ case MATCHING_LINE -> {
+ return bufferReadCacheAndProcessLines(readCache, startPos,
cacheEndPos);
+ }
+ case END_OF_STREAM -> {
+ return processLeftBuffer(cacheEndPos);
+ }
+ default -> throw new IllegalStateException("unexpected case");
+ }
+ }
+
+ private int processLeftBuffer(int cacheEndPos) {
+ final byte[] bytes = readAllBytesFromBuffer(buffer);
+ final String bufferStr = new String(bytes, StandardCharsets.UTF_8);
+ processStringBuffer(bufferStr);
+ return cacheEndPos;
+ }
+
+ private int bufferReadCacheAndProcessLines(byte[] readCache, int startPos,
int cacheEndPos) {
+ buffer.writeBytes(readCache, startPos, cacheEndPos - startPos);
+ processAllAvailableBufferLines();
+ return cacheEndPos;
+ }
+
+ private int tryToMatchDelimiterCR(byte[] readCache, int startPos, int
cacheEndPos) {
+ int bytesProcessed = 0;
+ for (; matchingDelimiterIndex < CR.length && startPos + bytesProcessed <
cacheEndPos; ++bytesProcessed) {
+ if (readCache[startPos + bytesProcessed] == CR[matchingDelimiterIndex]) {
+ buffer.writeByte(readCache[startPos + bytesProcessed]);
+ ++matchingDelimiterIndex;
+ } else {
+ loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
+ matchingDelimiterIndex = 0;
+ return startPos + bytesProcessed;
+ }
+ }
+ if (matchingDelimiterIndex == CR.length) {
+ // matched all CR bytes, attempting to further match CRLF.
+ loopStatus = ProcessStatus.MATCHING_CRLF;
+ }
+ return startPos + bytesProcessed;
+ }
+
+ private int tryToMatchDelimiterCRLF(byte[] readCache, int startPos, int
cacheEndPos) {
+ // If you enter this branch, it means that at least CR should be used as
the line break character.
+ int bytesProcessed = 0;
+ for (; matchingDelimiterIndex < CRLF.length && startPos + bytesProcessed <
cacheEndPos; ++bytesProcessed) {
+ if (readCache[startPos + bytesProcessed] ==
CRLF[matchingDelimiterIndex]) {
+ buffer.writeByte(readCache[startPos + bytesProcessed]);
+ ++matchingDelimiterIndex;
+ } else {
+ determineDelimiter(CR_STR, CR);
+ return startPos + bytesProcessed;
+ }
+ }
+ if (matchingDelimiterIndex == CRLF.length) {
+ determineDelimiter(CRLF_STR, CRLF);
+ }
+ return startPos + bytesProcessed;
+ }
+
+ private int tryToMatchDelimiterLF(byte[] readCache, int startPos, int
cacheEndPos) {
+ int bytesProcessed = 0;
+ for (; matchingDelimiterIndex < LF.length && startPos + bytesProcessed <
cacheEndPos; ++bytesProcessed) {
+ if (readCache[startPos + bytesProcessed] == LF[matchingDelimiterIndex]) {
+ buffer.writeByte(readCache[startPos + bytesProcessed]);
+ ++matchingDelimiterIndex;
+ } else {
+ loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER;
+ matchingDelimiterIndex = 0;
+ return startPos + bytesProcessed;
+ }
+ }
+ if (matchingDelimiterIndex == LF.length) {
+ determineDelimiter(LF_STR, LF);
+ }
+ return startPos + bytesProcessed;
+ }
+
+ private void determineDelimiter(String delimiterStr, byte[] delimiterBytes) {
+ lineDelimiter = delimiterStr;
+ lineDelimiterBytes = delimiterBytes;
+ matchingDelimiterIndex = 0;
+ loopStatus = ProcessStatus.MATCHING_LINE;
+ }
+
+ private int searchFirstLineDelimiter(byte[] readCache, int startPos, int
cacheEndPos) {
+ for (int i = startPos; i < cacheEndPos; ++i) {
+ if (readCache[i] == CR[0]) {
+ loopStatus = ProcessStatus.MATCHING_CR;
+ matchingDelimiterIndex = 0;
+ return i;
+ } else if (readCache[i] == LF[0]) {
+ loopStatus = ProcessStatus.MATCHING_LF;
+ matchingDelimiterIndex = 0;
+ return i;
+ } else {
+ buffer.writeByte(readCache[i]);
+ }
+ }
+ return cacheEndPos;
+ }
+
+ private void processAllAvailableBufferLines() {
+ while (buffer.readableBytes() > 0) {
+ final byte[] bytes = readALineOfBytesFromBuffer(buffer);
+ if (bytes == null || bytes.length == 0) {
+ return;
+ }
+ final String bufferStr = new String(bytes, StandardCharsets.UTF_8);
+ processStringBuffer(bufferStr);
+ }
+ }
+
+ private void processStringBuffer(String bufferStr) {
+ int cursor = 0;
+ int delimiterIdx;
+ while ((delimiterIdx = bufferStr.indexOf(lineDelimiter, cursor)) >= 0) {
+ final String line = bufferStr.substring(cursor, delimiterIdx);
+ processStringLine(line);
+ cursor = delimiterIdx + lineDelimiter.length();
+ }
+ if (cursor < bufferStr.length()) {
+
buffer.writeBytes(bufferStr.substring(cursor).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ private void processStringLine(String line) {
+ if (StringUtils.isBlank(line)) {
+ if (currentEntity.isEmpty()) {
+ return;
+ }
+ entityList.add(currentEntity);
+ currentEntity = new SseEventResponseEntity<>();
+ return;
+ }
+ final String[] split = line.split(":", 2);
+ if (split.length < 2) {
+ LOGGER.error("get a line of sse event without colon! stream is
breaking!");
+ throw new IllegalStateException("get a line of sse event without
colon!");
+ }
+ switch (split[0]) {
+ case "event" -> {
+ if (StringUtils.isNotBlank(split[1])) {
+ currentEntity.event(split[1].trim());
+ }
+ }
+ case "id" -> {
+ if (StringUtils.isNotBlank(split[1])) {
+ currentEntity.id(Integer.parseInt(split[1].trim()));
+ }
+ }
+ case "data" -> {
+ try {
+
currentEntity.data(RestObjectMapperFactory.getRestObjectMapper().readValue(split[1].trim(),
type));
+ } catch (JsonProcessingException e) {
+ LOGGER.error("failed to process data of sse event: [{}]",
e.getMessage());
+ throw new IllegalStateException("failed to process data of sse
event", e);
+ }
+ }
+ case "retry" -> {
+ if (StringUtils.isNotBlank(split[1])) {
+ currentEntity.retry(Long.parseLong(split[1].trim()));
+ }
+ }
+ default -> {
+ LOGGER.debug("unrecognized sse message line! ignored string segment
length=[{}]", line.length());
+ }
+ }
+ }
+
+ private byte[] readALineOfBytesFromBuffer(ByteBuf buffer) {
+ matchingDelimiterIndex = 0;
+ try (final ByteArrayOutputStream bos = new
ByteArrayOutputStream(buffer.readableBytes())) {
+ while (buffer.readableBytes() > 0 && matchingDelimiterIndex <
lineDelimiterBytes.length) {
+ final byte b = buffer.readByte();
+ if (b == lineDelimiterBytes[matchingDelimiterIndex]) {
+ ++matchingDelimiterIndex;
+ }
+ bos.write(b);
+ }
+ if (matchingDelimiterIndex < lineDelimiterBytes.length) {
+ // The newline character was not matched, so this part of the buffer
does not constitute a complete line of
+ // content and needs to remain in the buffer, waiting for the next
segment to arrive for processing.
+ buffer.writeBytes(bos.toByteArray());
+ return null;
+ }
+ matchingDelimiterIndex = 0;
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalStateException("impossible error while closing
ByteArrayOutputStream", e);
+ }
+ }
+
+ private byte[] readAllBytesFromBuffer(ByteBuf buffer) {
+ try (final ByteArrayOutputStream bos = new
ByteArrayOutputStream(buffer.readableBytes())) {
+ buffer.readBytes(bos, buffer.readableBytes());
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalStateException("impossible error while closing
ByteArrayOutputStream", e);
+ }
+ }
+
+ private void appendId(StringBuilder eventBuilder, Integer eventId) {
+ int id = eventId != null ? eventId : writeIndex++;
+ eventBuilder.append("id: ").append(id).append("\n");
+ }
+
+ private void appendEvent(StringBuilder eventBuilder, String event) {
+ if (StringUtils.isEmpty(event)) {
+ return;
+ }
+ eventBuilder.append("event: ").append(event).append("\n");
+ }
+
+ private void appendRetry(StringBuilder eventBuilder, Long retry) {
+ if (retry == null) {
+ return;
+ }
+ eventBuilder.append("retry: ").append(retry.longValue()).append("\n");
+ }
+
+ private void appendData(StringBuilder eventBuilder, List<?> datas) throws
Exception {
+ if (CollectionUtils.isEmpty(datas)) {
+ throw new Exception("sse response data is null!");
+ }
+ for (Object data : datas) {
+ eventBuilder.append("data: ")
+
.append(RestObjectMapperFactory.getRestObjectMapper().writeValueAsString(data))
+ .append("\n");
+ }
+ }
+
+ @Override
+ public Publisher<SseEventResponseEntity<?>> decodeResponse(Buffer buffer,
JavaType type) throws Exception {
+ if (buffer.length() == 0) {
+ return Flowable.empty();
+ }
+
+ try (BufferInputStream input = new BufferInputStream(buffer.getByteBuf()))
{
+ final List<SseEventResponseEntity<?>> list = doDecodeResponse(input,
type);
+ return Flowable.fromIterable(list);
+ }
+ }
+
+ public Publisher<SseEventResponseEntity<?>> close() throws Exception {
+ if (type == null) {
+ return Flowable.empty();
+ }
+ try (final ByteArrayInputStream input = new ByteArrayInputStream(
+ (lineDelimiter + lineDelimiter).getBytes(StandardCharsets.UTF_8))) {
+ // Write two additional newline characters into the buffer to ensure
that the processor completes
+ // processing all remaining content in the buffer.
+ final List<SseEventResponseEntity<?>> list = doDecodeResponse(input,
type);
+ return Flowable.fromIterable(list);
+ }
+ }
+}
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
index 5061ef783..9da6d06f3 100644
---
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
@@ -88,6 +88,8 @@ public class RestOperationMeta {
// 快速构建URL path
private URLPathBuilder pathBuilder;
+ protected static final String EVENTS_MEDIA_TYPE =
MediaType.SERVER_SENT_EVENTS;
+
public void init(OperationMeta operationMeta) {
this.operationMeta = operationMeta;
@@ -258,6 +260,12 @@ public class RestOperationMeta {
ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass));
} else {
for (String produce : produces) {
+ if (produce.contains(EVENTS_MEDIA_TYPE)) {
+ // When the produce type is event-stream, the
ProduceEventStreamProcessor implementation class corresponding
+ // to event-stream is not added, and it is set to the default type
ProduceJsonProcessor.
+ // In case of an exception, the response result is parsed.
+ continue;
+ }
if (produce.contains(";")) {
produce = produce.substring(0, produce.indexOf(";"));
}
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
index eaf62b632..f556ffa05 100644
---
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java
@@ -44,8 +44,8 @@ import
org.apache.servicecomb.foundation.vertx.stream.BufferOutputStream;
import org.apache.servicecomb.swagger.invocation.Response;
import org.springframework.stereotype.Component;
-import io.netty.buffer.Unpooled;
import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
@Component
public class RestServerCodecFilter implements ProducerFilter {
@@ -103,7 +103,7 @@ public class RestServerCodecFilter implements
ProducerFilter {
}
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
- try (BufferOutputStream output = new
BufferOutputStream(Unpooled.compositeBuffer())) {
+ try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
produceProcessor.encodeResponse(output, response.getResult());
responseEx.setBodyBuffer(output.getBuffer());
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
index bef5b48a4..281769d75 100644
---
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
@@ -19,11 +19,13 @@ package org.apache.servicecomb.common.rest.filter.inner;
import static
org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter.isDownloadFileResponseType;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.servicecomb.common.rest.RestConst;
import org.apache.servicecomb.common.rest.codec.RestCodec;
+import
org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
@@ -36,15 +38,22 @@ import
org.apache.servicecomb.foundation.vertx.stream.BufferOutputStream;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.netflix.config.DynamicPropertyFactory;
-import io.netty.buffer.Unpooled;
+import io.vertx.core.buffer.Buffer;
public class ServerRestArgsFilter implements HttpServerFilter {
private static final boolean enabled =
DynamicPropertyFactory.getInstance().getBooleanProperty
("servicecomb.http.filter.server.serverRestArgs.enabled", true).get();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ServerRestArgsFilter.class);
+
@Override
public int getOrder() {
return -100;
@@ -74,10 +83,15 @@ public class ServerRestArgsFilter implements
HttpServerFilter {
return responseEx.sendPart(PartUtils.getSinglePart(null,
response.getResult()));
}
- responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
+ if (isServerSendEvent(response)) {
+ produceProcessor = new ProduceEventStreamProcessor();
+ responseEx.setContentType(produceProcessor.getName() + ";
charset=utf-8");
+ return writeServerSendEvent(invocation, response, produceProcessor,
responseEx);
+ }
+ responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
CompletableFuture<Void> future = new CompletableFuture<>();
- try (BufferOutputStream output = new
BufferOutputStream(Unpooled.compositeBuffer())) {
+ try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
if (failed) {
produceProcessor.encodeResponse(output, ((InvocationException)
response.getResult()).getErrorData());
} else {
@@ -91,4 +105,76 @@ public class ServerRestArgsFilter implements
HttpServerFilter {
}
return future;
}
+
+ public static boolean isServerSendEvent(Response response) {
+ return response.getResult() instanceof Publisher<?>;
+ }
+
+ private static CompletableFuture<Void> writeServerSendEvent(Invocation
invocation, Response response,
+ ProduceProcessor produceProcessor, HttpServletResponseEx responseEx) {
+ responseEx.setChunkedForEvent(true);
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ Publisher<?> publisher = response.getResult();
+ publisher.subscribe(new Subscriber<Object>() {
+ Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ s.request(1);
+ subscription = s;
+ }
+
+ @Override
+ public void onNext(Object o) {
+ try {
+ writeResponse(responseEx, produceProcessor, o,
response).whenComplete((r, e) -> {
+ if (e != null) {
+ subscription.cancel();
+ result.completeExceptionally(e);
+ return;
+ }
+ subscription.request(1);
+ });
+ } catch (Throwable e) {
+ LOGGER.warn("Failed to subscribe event: {}", o, e);
+ result.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ result.completeExceptionally(t);
+ }
+
+ @Override
+ public void onComplete() {
+ result.complete(null);
+ }
+ });
+ return result;
+ }
+
+ private static CompletableFuture<Response> writeResponse(
+ HttpServletResponseEx responseEx, ProduceProcessor produceProcessor,
Object data, Response response) {
+ try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
+ produceProcessor.encodeResponse(output, data);
+ CompletableFuture<Response> result = new CompletableFuture<>();
+ responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> {
+ if (e != null) {
+ result.completeExceptionally(e);
+ }
+ try {
+ responseEx.flushBuffer();
+ } catch (IOException ex) {
+ LOGGER.warn("Failed to flush buffer for Server Send Events", ex);
+ }
+ });
+ result.complete(response);
+ return result;
+ } catch (Throwable e) {
+ LOGGER.error("internal service error must be fixed.", e);
+ responseEx.setStatus(500);
+ return CompletableFuture.failedFuture(e);
+ }
+ }
}
diff --git
a/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
b/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
index 9353b9dce..a05c6126d 100644
---
a/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
+++
b/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor
@@ -16,4 +16,5 @@
#
org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor
-org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
\ No newline at end of file
+org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
+org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor
\ No newline at end of file
diff --git
a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index 0d327ae06..f0ac33456 100644
---
a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -916,7 +916,7 @@ public class TestAbstractRestInvocation {
}
@Override
- public void flushBuffer() {
+ public void endResponse() {
endCount.value = endCount.value + 1;
}
@@ -956,7 +956,7 @@ public class TestAbstractRestInvocation {
}
@Override
- public void flushBuffer() {
+ public void endResponse() {
endCount.value = endCount.value + 1;
}
diff --git
a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java
new file mode 100644
index 000000000..c1cbf95b9
--- /dev/null
+++
b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.common.rest.codec.produce;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JavaType;
+
+import io.reactivex.rxjava3.core.Flowable;
+
+/**
+ * Test ProduceEventStreamProcessor
+ *
+ * @since 2025-08-19
+ */
+public class TestProduceEventStreamProcessor {
+ @Test
+ public void doDecodeResponse() throws Exception {
+ doDecodeResponseTemplateComposite("\n");
+ doDecodeResponseTemplateComposite("\r");
+ doDecodeResponseTemplateComposite("\r\n");
+ }
+
+ private void doDecodeResponseTemplateComposite(String lineDelimiter) throws
Exception {
+ doDecodeResponseTemplate(lineDelimiter, 2);
+ doDecodeResponseTemplate(lineDelimiter, 1);
+ doDecodeResponseTemplate(lineDelimiter, 0);
+ }
+
+ private void doDecodeResponseTemplate(String lineDelimiter, int
delimiterInTheFirstSegment) throws Exception {
+ final ProduceEventStreamProcessor processor = new
ProduceEventStreamProcessor();
+ final ByteArrayInputStream stream0 = prepareStream(
+ "id: 0" + lineDelimiter + "data: \"aaa\"" +
lineDelimiter.repeat(delimiterInTheFirstSegment));
+ final Object o0 = processor.doDecodeResponse(stream0,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ if (delimiterInTheFirstSegment > 1) {
+ checkDecodeResult(o0, new ItemChecker(0,
Collections.singletonList("aaa"), null, null));
+ } else {
+ checkDecodeResult(o0);
+ }
+
+ final ByteArrayInputStream stream1 = prepareStream(
+ lineDelimiter.repeat(2 - delimiterInTheFirstSegment) + "id: 1" +
lineDelimiter + "data: \"bbb\""
+ + lineDelimiter);
+ final Object o1 = processor.doDecodeResponse(stream1,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ if (delimiterInTheFirstSegment > 1) {
+ checkDecodeResult(o1);
+ } else {
+ checkDecodeResult(o1, new ItemChecker(0,
Collections.singletonList("aaa"), null, null));
+ }
+
+ final ByteArrayInputStream stream2 = prepareStream(
+ lineDelimiter + "id: 2" + lineDelimiter + "data: \"ccc\"" +
lineDelimiter + "event: test" + lineDelimiter
+ + "retry: 123");
+ final Object o2 = processor.doDecodeResponse(stream2,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ checkDecodeResult(o2,
+ new ItemChecker(1, Collections.singletonList("bbb"), null, null));
+
+ final ByteArrayInputStream stream3 = prepareStream(
+ lineDelimiter + lineDelimiter + "id: 3" + lineDelimiter + "data:
\"ddd\"" + lineDelimiter
+ + "event: test3" + lineDelimiter + "retry: 321" + lineDelimiter);
+ final Object o3 = processor.doDecodeResponse(stream3,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ checkDecodeResult(o3,
+ new ItemChecker(2, Collections.singletonList("ccc"), "test", 123L));
+ final Object o4 = ((Flowable) processor.close()).toList().blockingGet();
+ checkDecodeResult(o4, new ItemChecker(3, Collections.singletonList("ddd"),
"test3", 321L));
+ }
+
+ @Test
+ public void doDecodeResponseMultiData() throws Exception {
+ doDecodeResponseMultiDataTemplate("\n");
+ doDecodeResponseMultiDataTemplate("\r");
+ doDecodeResponseMultiDataTemplate("\r\n");
+ }
+
+ private void doDecodeResponseMultiDataTemplate(String lineDelimiter) throws
Exception {
+ final ProduceEventStreamProcessor processor = new
ProduceEventStreamProcessor();
+
+ final ByteArrayInputStream stream0 = prepareStream(
+ "id: 0" + lineDelimiter + "data: \"aaa1\"" + lineDelimiter + "data:
\"aaa2\"" + lineDelimiter + lineDelimiter);
+ final Object o0 = processor.doDecodeResponse(stream0,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ checkDecodeResult(o0, new ItemChecker(0, Arrays.asList("aaa1", "aaa2"),
null, null));
+
+ final ByteArrayInputStream stream1 = prepareStream(
+ "id: 1" + lineDelimiter + "data: \"aaa3\"" + lineDelimiter + "data:
\"aaa4\"" + lineDelimiter + "data: \"aaa5\""
+ + lineDelimiter + "data: \"aaa6\"");
+ final Object o1 = processor.doDecodeResponse(stream1,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ checkDecodeResult(o1);
+ final Object o2 = ((Flowable) processor.close()).toList().blockingGet();
+ checkDecodeResult(o2, new ItemChecker(1, Arrays.asList("aaa3", "aaa4",
"aaa5", "aaa6"), null, null));
+ }
+
+ @Test
+ public void doDecodeResponseMultiPackage() throws Exception {
+ doDecodeResponseMultiPackageTemplate("\n");
+ doDecodeResponseMultiPackageTemplate("\r");
+ doDecodeResponseMultiPackageTemplate("\r\n");
+ }
+
+ private void doDecodeResponseMultiPackageTemplate(String lineDelimiter)
throws Exception {
+ final ProduceEventStreamProcessor processor = new
ProduceEventStreamProcessor();
+
+ final ByteArrayInputStream stream0 = prepareStream(
+ "id: 0" + lineDelimiter + "data: \"aaa\"" + lineDelimiter +
lineDelimiter + lineDelimiter);
+ final Object o0 = processor.doDecodeResponse(stream0,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ checkDecodeResult(o0, new ItemChecker(0, Collections.singletonList("aaa"),
null, null));
+
+ final ByteArrayInputStream stream1 = prepareStream("id: 1" + lineDelimiter
+ "data: \"bbb\"" + lineDelimiter);
+ final Object o1 = processor.doDecodeResponse(stream1,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ checkDecodeResult(o1);
+
+ final ByteArrayInputStream stream2 = prepareStream(
+ lineDelimiter + "id: 2" + lineDelimiter + "data: \"ccc\"" +
lineDelimiter + "event: test" + lineDelimiter
+ + "retry: 123" + lineDelimiter + lineDelimiter + "id: 3" +
lineDelimiter + "data: \"ddd\"" + lineDelimiter
+ + "event: test3" + lineDelimiter + "retry: 321" + lineDelimiter +
lineDelimiter);
+ final Object o2 = processor.doDecodeResponse(stream2,
+
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class));
+ checkDecodeResult(o2,
+ new ItemChecker(1, Collections.singletonList("bbb"), null, null),
+ new ItemChecker(2, Collections.singletonList("ccc"), "test", 123L),
+ new ItemChecker(3, Collections.singletonList("ddd"), "test3", 321L));
+ final Object o4 = ((Flowable) processor.close()).toList().blockingGet();
+ checkDecodeResult(o4);
+ }
+
+ @Test
+ public void doDecodeResponseHalfPackage() {
+ for (int splitIndexesCount = 1; splitIndexesCount < 3;
splitIndexesCount++) {
+ for (int trailingDelimiterCount = 0; trailingDelimiterCount < 3;
trailingDelimiterCount++) {
+ doDecodeResponseHalfPackageTemplate("\n", splitIndexesCount,
trailingDelimiterCount);
+ doDecodeResponseHalfPackageTemplate("\r", splitIndexesCount,
trailingDelimiterCount);
+ doDecodeResponseHalfPackageTemplate("\r\n", splitIndexesCount,
trailingDelimiterCount);
+ }
+ }
+ }
+
+ private void doDecodeResponseHalfPackageTemplate(String lineDelimiter, int
splitIndexesCount,
+ int trailingDelimiterCount) {
+ final String messageTemplate =
+ "data: \"中文aaa\"" + lineDelimiter
+ + "id: 0" + lineDelimiter
+ + lineDelimiter
+ + "id: 1" + lineDelimiter
+ + "data: \"bbb中文\"" + lineDelimiter
+ + "data: \"123汉语\"" + lineDelimiter
+ + lineDelimiter
+ + "data:
\"~!@#$%^&*()_+=-0987654321`中文[]{}\\\\|;':\\\",./<>?abc\"" + lineDelimiter
+ + "data: \"文字ccc\"" + lineDelimiter
+ + "data: \"中文321\"" + lineDelimiter
+ + "id: 2" + lineDelimiter
+ + "retry: 3600" + lineDelimiter
+ + "event: test" + lineDelimiter.repeat(trailingDelimiterCount);
+ final byte[] messageTemplateBytes =
messageTemplate.getBytes(StandardCharsets.UTF_8);
+
+ int[] splitIndexes = new int[splitIndexesCount];
+ try {
+ runDecodeResponseHalfPackageTemplate(0, splitIndexes,
messageTemplateBytes);
+ } catch (AssertionError e) {
+ throw new AssertionError(e.getMessage() + ", messageTemplate=["
+ + messageTemplate
+ + "]", e);
+ }
+ }
+
+ private void runDecodeResponseHalfPackageTemplate(int cursor, int[]
splitIndexes, byte[] messageTemplateBytes) {
+ if (cursor != splitIndexes.length) {
+ for (int i = cursor == 0 ? 0 : splitIndexes[cursor - 1]; i <=
messageTemplateBytes.length; ++i) {
+ splitIndexes[cursor] = i;
+ runDecodeResponseHalfPackageTemplate(cursor + 1, splitIndexes,
messageTemplateBytes);
+ }
+ return;
+ }
+
+ final List<ByteArrayInputStream> byteArrayInputStreams =
splitByteArrayInputStreams(
+ splitIndexes, messageTemplateBytes);
+
+ final ProduceEventStreamProcessor processor = new
ProduceEventStreamProcessor();
+ final JavaType javaType =
RestObjectMapperFactory.getRestObjectMapper().constructType(String.class);
+
+ final List<SseEventResponseEntity<?>> entityList = new ArrayList<>();
+ for (ByteArrayInputStream byteArrayInputStream : byteArrayInputStreams) {
+ final List<SseEventResponseEntity<?>> entities;
+ try {
+ entities = (List<SseEventResponseEntity<?>>) processor.decodeResponse(
+ byteArrayInputStream, javaType);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ entityList.addAll(entities);
+ }
+ try {
+ entityList.addAll(((Flowable<SseEventResponseEntity<?>>)
processor.close()).toList().blockingGet());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ checkDecodeResult(entityList,
+ new ItemChecker(0, Collections.singletonList("中文aaa"), null, null),
+ new ItemChecker(1, Arrays.asList("bbb中文", "123汉语"), null, null),
+ new ItemChecker(2,
Arrays.asList("~!@#$%^&*()_+=-0987654321`中文[]{}\\|;':\",./<>?abc",
+ "文字ccc", "中文321"), "test", 3600L));
+ } catch (AssertionError e) {
+ throw new AssertionError(e.getMessage() + System.lineSeparator() +
+ ", splitIndexes=" + Arrays.toString(splitIndexes), e);
+ }
+ }
+
+ private List<ByteArrayInputStream> splitByteArrayInputStreams(int[]
splitIndexes, byte[] messageTemplateBytes) {
+ int splitHeader = 0;
+ final List<ByteArrayInputStream> byteArrayInputStreams = new ArrayList<>();
+ for (int split : splitIndexes) {
+ final byte[] segment = Arrays.copyOfRange(messageTemplateBytes,
splitHeader, split);
+ final ByteArrayInputStream stream = new ByteArrayInputStream(segment);
+ byteArrayInputStreams.add(stream);
+ splitHeader = split;
+ }
+
+ final byte[] segment = Arrays.copyOfRange(messageTemplateBytes,
splitIndexes[splitIndexes.length - 1],
+ messageTemplateBytes.length);
+ final ByteArrayInputStream stream = new ByteArrayInputStream(segment);
+ byteArrayInputStreams.add(stream);
+
+ return byteArrayInputStreams;
+ }
+
+ private void checkDecodeResult(Object obj) {
+ MatcherAssert.assertThat(obj, Matchers.instanceOf(List.class));
+ MatcherAssert.assertThat(((List<?>) obj).size(), Matchers.equalTo(0));
+ }
+
+ private void checkDecodeResult(Object obj, ItemChecker... checkers) {
+ MatcherAssert.assertThat(obj, Matchers.instanceOf(List.class));
+ final List<?> objList = (List<?>) obj;
+ MatcherAssert.assertThat("expect size of objList is " + checkers.length,
+ objList.size(), Matchers.equalTo(checkers.length));
+
+ for (int i = 0; i < objList.size(); i++) {
+ Object result = objList.get(i);
+ checkers[i]
+ .check(result);
+ }
+ }
+
+ private ByteArrayInputStream prepareStream(String buffer) {
+ return new ByteArrayInputStream(
+ buffer.getBytes(StandardCharsets.UTF_8));
+ }
+
+ private static class ItemChecker {
+ private int id;
+
+ private List<String> expectDataList;
+
+ private String event;
+
+ private Long retry;
+
+ public ItemChecker(int id, List<String> expectDataList, String event, Long
retry) {
+ this.id = id;
+ this.expectDataList = expectDataList;
+ this.event = event;
+ this.retry = retry;
+ }
+
+ public void check(Object actualResult) {
+ final SseEventResponseEntity<?> entity0 = ((SseEventResponseEntity<?>)
actualResult);
+ MatcherAssert.assertThat(entity0.getData(),
Matchers.instanceOf(List.class));
+ final List<?> data = entity0.getData();
+ MatcherAssert.assertThat("actual data = " + data.toString(), data.size(),
+ Matchers.equalTo(expectDataList.size()));
+ for (int i = 0; i < data.size(); i++) {
+ MatcherAssert.assertThat(data.get(i),
Matchers.equalTo(expectDataList.get(i)));
+ }
+ MatcherAssert.assertThat("actual event = " + entity0.getEvent(),
+ entity0.getEvent(), event == null ? Matchers.nullValue() :
Matchers.equalTo(event));
+ MatcherAssert.assertThat("actual retry = " + entity0.getRetry(),
+ entity0.getRetry(), retry == null ? Matchers.nullValue() :
Matchers.equalTo(retry));
+ MatcherAssert.assertThat("actual id = " + entity0.getId(),
+ entity0.getId(), Matchers.equalTo(id));
+ }
+ }
+}
diff --git a/demo/demo-cse-v2/gateway/pom.xml b/demo/demo-cse-v2/gateway/pom.xml
index 3dc9ed6c3..4d491cdf8 100644
--- a/demo/demo-cse-v2/gateway/pom.xml
+++ b/demo/demo-cse-v2/gateway/pom.xml
@@ -33,6 +33,20 @@
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>java-chassis-spring-boot-starter-standalone</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-to-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
diff --git a/demo/demo-cse-v2/provider/pom.xml
b/demo/demo-cse-v2/provider/pom.xml
index c5b57816f..e186908d3 100644
--- a/demo/demo-cse-v2/provider/pom.xml
+++ b/demo/demo-cse-v2/provider/pom.xml
@@ -52,6 +52,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava3</groupId>
+ <artifactId>rxjava</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java
similarity index 55%
copy from
foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
copy to
demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java
index 265ee658e..7cfdf6bdf 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++
b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java
@@ -15,26 +15,37 @@
* limitations under the License.
*/
-package org.apache.servicecomb.foundation.vertx.http;
+package org.apache.servicecomb.demo.model;
-import java.util.concurrent.CompletableFuture;
+public class Model {
+ private String name;
-import jakarta.servlet.http.HttpServletResponse;
-import jakarta.servlet.http.Part;
-import jakarta.ws.rs.core.Response.StatusType;
+ private int age;
-import io.vertx.core.http.HttpHeaders;
+ public Model() {
-public interface HttpServletResponseEx extends HttpServletResponse,
BodyBufferSupport {
- StatusType getStatusType();
+ }
+
+ public Model(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
- void setAttribute(String key, Object value);
+ public int getAge() {
+ return age;
+ }
- Object getAttribute(String key);
+ public Model setAge(int age) {
+ this.age = age;
+ return this;
+ }
- CompletableFuture<Void> sendPart(Part body);
+ public String getName() {
+ return name;
+ }
- default void setChunked(boolean chunked) {
- setHeader(HttpHeaders.TRANSFER_ENCODING.toString(),
HttpHeaders.CHUNKED.toString());
+ public Model setName(String name) {
+ this.name = name;
+ return this;
}
}
diff --git
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
new file mode 100644
index 000000000..4830f872c
--- /dev/null
+++
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.springboot.springmvc.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.apache.servicecomb.demo.CategorizedTestCase;
+import org.apache.servicecomb.demo.TestMgr;
+import org.apache.servicecomb.demo.model.Model;
+import
org.apache.servicecomb.springboot.springmvc.client.ThirdSvcConfiguration.ReactiveStreamClient;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ReactiveStreamIT implements CategorizedTestCase {
+ @Autowired
+ @Qualifier("reactiveStreamProvider")
+ ReactiveStreamClient reactiveStreamProvider;
+
+ @Override
+ public void testRestTransport() throws Exception {
+ testSseString(reactiveStreamProvider);
+ testSseStringWithParam(reactiveStreamProvider);
+ testSseModel(reactiveStreamProvider);
+ testSseResponseEntity(reactiveStreamProvider);
+ testSseMultipleData(reactiveStreamProvider);
+ }
+
+ private void testSseString(ReactiveStreamClient client) throws Exception {
+ Publisher<String> result = client.sseString();
+ TestMgr.check("abc", buildBufferString(result));
+ }
+
+ private void testSseStringWithParam(ReactiveStreamClient client) throws
Exception {
+ Publisher<String> result = client.sseStringWithParam("d");
+ TestMgr.check("abcd", buildBufferString(result));
+ }
+
+ private String buildBufferString(Publisher<String> result) throws Exception {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ StringBuilder buffer = new StringBuilder();
+ result.subscribe(new Subscriber<>() {
+ Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscription = s;
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(String s) {
+ buffer.append(s);
+ subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ subscription.cancel();
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ return buffer.toString();
+ }
+
+ private void testSseModel(ReactiveStreamClient client) throws Exception {
+ Publisher<Model> result = client.sseModel();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ StringBuilder buffer = new StringBuilder();
+ result.subscribe(new Subscriber<>() {
+ Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscription = s;
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(Model model) {
+ buffer.append(model.getName()).append(model.getAge());
+ subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ subscription.cancel();
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ TestMgr.check("jack0jack1jack2jack3jack4", buffer.toString());
+ }
+
+ private void testSseResponseEntity(ReactiveStreamClient client) throws
Exception {
+ Publisher<SseEventResponseEntity<Model>> result =
client.sseResponseEntity();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ StringBuilder buffer = new StringBuilder();
+ result.subscribe(new Subscriber<>() {
+ Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscription = s;
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(SseEventResponseEntity<Model> responseEntity) {
+ if (!StringUtils.isEmpty(responseEntity.getEvent())) {
+ buffer.append(responseEntity.getEvent());
+ }
+ for (Model model : responseEntity.getData()) {
+ buffer.append(model.getName()).append(model.getAge());
+ }
+ subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ subscription.cancel();
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ TestMgr.check("test0jack0test1jack1test2jack2", buffer.toString());
+ }
+
+ private void testSseMultipleData(ReactiveStreamClient client) throws
Exception {
+ Publisher<SseEventResponseEntity<Model>> result = client.sseMultipleData();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ StringBuilder buffer = new StringBuilder();
+ result.subscribe(new Subscriber<>() {
+ Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscription = s;
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(SseEventResponseEntity<Model> responseEntity) {
+ if (!StringUtils.isEmpty(responseEntity.getEvent())) {
+ buffer.append(responseEntity.getEvent());
+ }
+ for (Model model : responseEntity.getData()) {
+ buffer.append(model.getName()).append(model.getAge());
+ }
+ subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ subscription.cancel();
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ TestMgr.check("test0jack0tom0test1jack1tom1test2jack2tom2",
buffer.toString());
+ }
+}
diff --git
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java
new file mode 100644
index 000000000..3bd1986d9
--- /dev/null
+++
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.springboot.springmvc.client;
+
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.apache.servicecomb.demo.model.Model;
+import org.apache.servicecomb.provider.pojo.Invoker;
+import org.reactivestreams.Publisher;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+
+@Configuration
+public class ThirdSvcConfiguration {
+ @RequestMapping(path = "/")
+ public interface ReactiveStreamClient {
+ @GetMapping("/sseString")
+ Publisher<String> sseString();
+
+ @GetMapping("/sseStringWithParam")
+ Publisher<String> sseStringWithParam(String param);
+
+ @GetMapping("/sseModel")
+ Publisher<Model> sseModel();
+
+ @GetMapping("/sseResponseEntity")
+ Publisher<SseEventResponseEntity<Model>> sseResponseEntity();
+
+ @GetMapping("/sseMultipleData")
+ Publisher<SseEventResponseEntity<Model>> sseMultipleData();
+ }
+
+ @Bean("reactiveStreamProvider")
+ public ReactiveStreamClient reactiveStreamProvider() {
+ return Invoker.createProxy("springmvc", "ReactiveStreamController",
ReactiveStreamClient.class);
+ }
+}
diff --git
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java
new file mode 100644
index 000000000..323d3fbc3
--- /dev/null
+++
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.springboot.springmvc.server;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.apache.servicecomb.demo.model.Model;
+import org.apache.servicecomb.provider.rest.common.RestSchema;
+import org.reactivestreams.Publisher;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import io.reactivex.rxjava3.core.Flowable;
+
+@RestSchema(schemaId = "ReactiveStreamController")
+@RequestMapping(path = "/")
+public class ReactiveStreamController {
+ @GetMapping("/sseString")
+ public Publisher<String> sseString() {
+ return Flowable.fromArray("a", "b", "c");
+ }
+
+ @GetMapping("/sseStringWithParam")
+ public Publisher<String> sseStringWithParam(@RequestParam(name = "param")
String param) {
+ return Flowable.fromArray("a", "b", "c", param);
+ };
+
+ @GetMapping("/sseModel")
+ public Publisher<Model> sseModel() {
+ return Flowable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
+ .map(item -> new Model("jack", item.intValue()));
+ }
+
+ @GetMapping("/sseResponseEntity")
+ public Publisher<SseEventResponseEntity<Model>> sseResponseEntity() {
+ AtomicInteger index = new AtomicInteger(0);
+ return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS)
+ .map(item -> new SseEventResponseEntity<Model>()
+ .event("test" + index)
+ .id(index.getAndIncrement())
+ .retry(System.currentTimeMillis())
+ .data(new Model("jack", item.intValue())));
+ }
+
+ @GetMapping("/sseMultipleData")
+ public Publisher<SseEventResponseEntity<Model>> sseMultipleData() {
+ AtomicInteger index = new AtomicInteger(0);
+ return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS)
+ .map(item -> new SseEventResponseEntity<Model>()
+ .event("test" + index)
+ .id(index.getAndIncrement())
+ .retry(System.currentTimeMillis())
+ .data(new Model("jack", item.intValue()))
+ .data(new Model("tom", item.intValue())));
+ }
+}
diff --git a/dependencies/default/pom.xml b/dependencies/default/pom.xml
index 3132ff8ac..efeb106ed 100644
--- a/dependencies/default/pom.xml
+++ b/dependencies/default/pom.xml
@@ -107,6 +107,8 @@
<jakarta.xml.bind.version>4.0.2</jakarta.xml.bind.version>
<jaxb-api.version>2.3.1</jaxb-api.version>
<groovy.version>3.0.9</groovy.version>
+ <reactive-streams.version>1.0.4</reactive-streams.version>
+ <rxjava3.version>3.1.10</rxjava3.version>
<!-- Base dir of main -->
<main.basedir>${basedir}/../..</main.basedir>
</properties>
@@ -397,6 +399,21 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ <version>${reactive-streams.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava3</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${rxjava3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-rx-java3</artifactId>
+ <version>${vertx.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.maven</groupId>
diff --git a/foundations/foundation-vertx/pom.xml
b/foundations/foundation-vertx/pom.xml
index f9bcd9a25..c0f80507b 100644
--- a/foundations/foundation-vertx/pom.xml
+++ b/foundations/foundation-vertx/pom.xml
@@ -43,6 +43,10 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>foundation-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-rx-java3</artifactId>
+ </dependency>
<dependency>
<groupId>io.vertx</groupId>
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
index 5d363e874..d8800800f 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
@@ -144,7 +144,7 @@ public class TcpClientConnection extends TcpConnection {
if (Status.WORKING.equals(status)) {
// encode in sender thread
try (TcpOutputStream os = tcpClientPackage.createStream()) {
- write(os.getByteBuf());
+ write(os.getBuffer());
tcpClientPackage.finishWriteToBuffer();
}
return true;
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
index 2a0559bab..fc1fc17df 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java
@@ -25,6 +25,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import io.vertx.core.buffer.Buffer;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.Cookie;
import jakarta.servlet.http.Part;
@@ -85,7 +86,7 @@ public abstract class AbstractHttpServletResponse extends
BodyBufferSupportImpl
@Override
public void flushBuffer() throws IOException {
- throw new Error("not supported method");
+ // for vert.x do noting
}
@Override
@@ -222,4 +223,14 @@ public abstract class AbstractHttpServletResponse extends
BodyBufferSupportImpl
public CompletableFuture<Void> sendPart(Part body) {
throw new Error("not supported method");
}
+
+ @Override
+ public CompletableFuture<Void> sendBuffer(Buffer buffer) {
+ throw new Error("not supported method");
+ }
+
+ @Override
+ public void endResponse() throws IOException {
+ throw new Error("not supported method");
+ }
}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
index 265ee658e..dfad8711a 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
@@ -17,12 +17,15 @@
package org.apache.servicecomb.foundation.vertx.http;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.servlet.http.Part;
import jakarta.ws.rs.core.Response.StatusType;
+import io.reactivex.rxjava3.core.Flowable;
+import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
public interface HttpServletResponseEx extends HttpServletResponse,
BodyBufferSupport {
@@ -37,4 +40,16 @@ public interface HttpServletResponseEx extends
HttpServletResponse, BodyBufferSu
default void setChunked(boolean chunked) {
setHeader(HttpHeaders.TRANSFER_ENCODING.toString(),
HttpHeaders.CHUNKED.toString());
}
+
+ CompletableFuture<Void> sendBuffer(Buffer buffer);
+
+ default Flowable<Buffer> getFlowableBuffer() {
+ return null;
+ }
+
+ void endResponse() throws IOException;
+
+ default void setChunkedForEvent(boolean chunked) {
+ // not set header transfer-encoding=chunked in Rest Over Servlet, or will
have Multiple in response.
+ }
}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
index fbe459745..a227560cd 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
@@ -83,7 +83,7 @@ public class StandardHttpServletResponseEx extends
HttpServletResponseWrapper im
}
@Override
- public void flushBuffer() throws IOException {
+ public void endResponse() throws IOException {
byte[] bytes = getBodyBytes();
if (bytes != null) {
getOutputStream().write(bytes, 0, getBodyBytesLength());
@@ -122,4 +122,16 @@ public class StandardHttpServletResponseEx extends
HttpServletResponseWrapper im
Context context = Vertx.currentContext();
return new PumpFromPart(context, part).toOutputStream(outputStream, false);
}
+
+ @Override
+ public CompletableFuture<Void> sendBuffer(Buffer buffer) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ getOutputStream().write(buffer.getBytes(), 0, buffer.length());
+ future.complete(null);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
index da7e53217..0d8eaef9f 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java
@@ -24,6 +24,7 @@ import jakarta.ws.rs.core.Response.StatusType;
import org.apache.servicecomb.foundation.common.http.HttpStatus;
+import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
@@ -32,11 +33,18 @@ public class VertxClientResponseToHttpServletResponse
extends AbstractHttpServle
private StatusType statusType;
+ private Flowable<Buffer> flowableBuffer;
+
public VertxClientResponseToHttpServletResponse(HttpClientResponse
clientResponse, Buffer bodyBuffer) {
this.clientResponse = clientResponse;
setBodyBuffer(bodyBuffer);
}
+ public VertxClientResponseToHttpServletResponse(HttpClientResponse
clientResponse, Flowable<Buffer> buffer) {
+ this.clientResponse = clientResponse;
+ this.flowableBuffer = buffer;
+ }
+
@Override
public int getStatus() {
return clientResponse.statusCode();
@@ -69,4 +77,9 @@ public class VertxClientResponseToHttpServletResponse extends
AbstractHttpServle
public Collection<String> getHeaderNames() {
return clientResponse.headers().names();
}
+
+ @Override
+ public Flowable<Buffer> getFlowableBuffer() {
+ return flowableBuffer;
+ }
}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
index e0c518b29..5cb640f38 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.foundation.vertx.http;
+import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import
org.apache.servicecomb.foundation.vertx.stream.PumpFromPart;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
public class VertxServerResponseToHttpServletResponse extends
AbstractHttpServletResponse {
@@ -100,7 +102,7 @@ public class VertxServerResponseToHttpServletResponse
extends AbstractHttpServle
}
@Override
- public void flushBuffer() {
+ public void endResponse() {
if (context == Vertx.currentContext()) {
internalFlushBuffer();
return;
@@ -134,4 +136,26 @@ public class VertxServerResponseToHttpServletResponse
extends AbstractHttpServle
public void setChunked(boolean chunked) {
serverResponse.setChunked(chunked);
}
+
+ @Override
+ public CompletableFuture<Void> sendBuffer(Buffer buffer) {
+ if (serverResponse.closed()) {
+ return CompletableFuture.failedFuture(new IOException("Response is
closed before sending any data. "
+ + "Maybe client is timeout or check idle connection timeout for
provider is properly configured."));
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ serverResponse.write(buffer).onComplete(result -> {
+ if (result.failed()) {
+ future.completeExceptionally(result.cause());
+ } else {
+ future.complete(null);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setChunkedForEvent(boolean chunked) {
+ this.setChunked(chunked);
+ }
}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
index f063d5d06..42c9b3126 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java
@@ -20,9 +20,7 @@ package org.apache.servicecomb.foundation.vertx.stream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
-import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
-import io.vertx.core.buffer.impl.VertxByteBufAllocator;
/**
* BufferOutputStream.
@@ -32,40 +30,36 @@ import io.vertx.core.buffer.impl.VertxByteBufAllocator;
public class BufferOutputStream extends OutputStream {
private static final int DIRECT_BUFFER_SIZE = 1024;
- protected ByteBuf byteBuf;
+ protected Buffer byteBuf;
public BufferOutputStream() {
- this(VertxByteBufAllocator.DEFAULT.heapBuffer(DIRECT_BUFFER_SIZE,
Integer.MAX_VALUE));
+ this(Buffer.buffer(DIRECT_BUFFER_SIZE));
}
- public BufferOutputStream(ByteBuf buffer) {
+ public BufferOutputStream(Buffer buffer) {
this.byteBuf = buffer;
}
- public ByteBuf getByteBuf() {
- return byteBuf;
- }
-
public Buffer getBuffer() {
- return Buffer.buffer(byteBuf);
+ return byteBuf;
}
public int length() {
- return byteBuf.readableBytes();
+ return byteBuf.length();
}
public void writeByte(byte value) {
- byteBuf.writeByte(value);
+ byteBuf.appendByte(value);
}
// 实际是写byte
@Override
public void write(int byteValue) {
- byteBuf.writeByte((byte) byteValue);
+ byteBuf.appendByte((byte) byteValue);
}
public void write(boolean value) {
- byteBuf.writeBoolean(value);
+ byteBuf.appendByte(value ? (byte) 1 : (byte) 0);
}
public void writeInt(int pos, int value) {
@@ -73,20 +67,20 @@ public class BufferOutputStream extends OutputStream {
}
public void writeShort(short value) {
- byteBuf.writeShort(value);
+ byteBuf.appendShort(value);
}
public void writeInt(int value) {
- byteBuf.writeInt(value);
+ byteBuf.appendInt(value);
}
public void writeLong(long value) {
- byteBuf.writeLong(value);
+ byteBuf.appendLong(value);
}
public void writeString(String value) {
- byteBuf.writeInt(value.length());
- byteBuf.writeCharSequence(value, StandardCharsets.UTF_8);
+ writeInt(value.length());
+ byteBuf.appendString(value, StandardCharsets.UTF_8.toString());
}
@Override
@@ -96,7 +90,7 @@ public class BufferOutputStream extends OutputStream {
@Override
public void write(byte[] bytes, int offset, int len) {
- byteBuf.writeBytes(bytes, offset, len);
+ byteBuf.appendBytes(bytes, offset, len);
}
@Override
@@ -105,6 +99,6 @@ public class BufferOutputStream extends OutputStream {
}
public int writerIndex() {
- return byteBuf.writerIndex();
+ return byteBuf.length();
}
}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
index 95f460013..396cf1634 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
+++
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java
@@ -20,7 +20,6 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.vertx.core.Context;
@@ -49,7 +48,7 @@ public class TcpConnection {
// so this optimization:
// 1.avoid vertx's lock
// 2.reduce netty's task schedule
- private final Queue<ByteBuf> writeQueue = new ConcurrentLinkedQueue<>();
+ private final Queue<Buffer> writeQueue = new ConcurrentLinkedQueue<>();
private final AtomicLong writeQueueSize = new AtomicLong();
@@ -83,7 +82,7 @@ public class TcpConnection {
this.context = netSocket.getContext();
}
- public void write(ByteBuf buf) {
+ public void write(Buffer buf) {
writeQueue.add(buf);
long oldSize = writeQueueSize.getAndIncrement();
if (oldSize == 0) {
@@ -99,13 +98,13 @@ public class TcpConnection {
protected void writeInContext() {
CompositeByteBuf cbb = ByteBufAllocator.DEFAULT.compositeBuffer();
for (; ; ) {
- ByteBuf buf = writeQueue.poll();
+ Buffer buf = writeQueue.poll();
if (buf == null) {
break;
}
writeQueueSize.decrementAndGet();
- cbb.addComponent(true, buf);
+ cbb.addComponent(true, buf.getByteBuf());
if (cbb.numComponents() == cbb.maxNumComponents()) {
CompositeByteBuf last = cbb;
diff --git
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
index 2c9fdc895..cd468a935 100644
---
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
+++
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java
@@ -53,7 +53,7 @@ public class TestStream {
Assertions.assertTrue((1 < oBufferOutputStream.length()));
@SuppressWarnings("resource")
- BufferInputStream oBufferInputStream = new
BufferInputStream(oBufferOutputStream.getByteBuf());
+ BufferInputStream oBufferInputStream = new
BufferInputStream(oBufferOutputStream.getBuffer().getByteBuf());
Assertions.assertEquals("test", oBufferInputStream.readString());
Assertions.assertEquals(1, oBufferInputStream.readByte());
Assertions.assertEquals(true, oBufferInputStream.readBoolean());
diff --git
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
index 0767898f2..a10a0c193 100644
---
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
+++
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
@@ -27,11 +27,11 @@ import org.junit.Before;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import mockit.Deencapsulation;
@@ -90,14 +90,14 @@ public class TestTcpClientConnection {
Deencapsulation.setField(tcpClientConnection, "status", Status.WORKING);
long msgId = 1;
- ByteBuf byteBuf = Unpooled.buffer();
+ Buffer byteBuf = Buffer.buffer();
new Expectations(tcpClientConnection) {
{
tcpClientPackage.getMsgId();
result = msgId;
tcpClientPackage.createStream();
result = tcpOutputStream;
- tcpOutputStream.getByteBuf();
+ tcpOutputStream.getBuffer();
result = byteBuf;
}
};
@@ -172,7 +172,7 @@ public class TestTcpClientConnection {
{
tcpClientPackage.getMsgId();
result = msgId;
- tcpClientConnection.write((ByteBuf) any);
+ tcpClientConnection.write((Buffer) any);
}
};
new MockUp<Context>(context) {
diff --git
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
index e6083edd0..053cc56e7 100644
---
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
+++
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java
@@ -91,8 +91,7 @@ public class TestAbstractHttpServletResponse {
@Test
public void testFlushBuffer() {
- Error error = Assertions.assertThrows(Error.class, () ->
response.flushBuffer());
- checkError(error);
+ Assertions.assertDoesNotThrow(() -> response.flushBuffer());
}
@Test
diff --git
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
index c672fb288..5c34814f1 100644
---
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
+++
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
@@ -74,7 +74,7 @@ public class TestStandardHttpServletResponseEx {
}
@Test
- public void flushBuffer() throws IOException {
+ public void endResponse() throws IOException {
Buffer buffer = Buffer.buffer();
ServletOutputStream output = new ServletOutputStream() {
@Override
@@ -97,12 +97,12 @@ public class TestStandardHttpServletResponseEx {
responseEx = new StandardHttpServletResponseEx(response);
// no body
- responseEx.flushBuffer();
+ responseEx.endResponse();
Assertions.assertEquals(0, buffer.length());
Buffer body = Buffer.buffer().appendString("body");
responseEx.setBodyBuffer(body);
- responseEx.flushBuffer();
+ responseEx.endResponse();
Assertions.assertEquals("body", buffer.toString());
}
diff --git
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
index e438da8b0..5778091ac 100644
---
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
+++
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
@@ -253,21 +253,21 @@ public class TestVertxServerResponseToHttpServletResponse
{
}
@Test
- public void flushBuffer_sameContext() throws IOException {
- response.flushBuffer();
+ public void endResponse_sameContext() throws IOException {
+ response.endResponse();
Assertions.assertFalse(runOnContextInvoked);
}
@Test
- public void flushBuffer_diffContext() throws IOException {
+ public void endResponse_diffContext() throws IOException {
new Expectations() {
{
Vertx.currentContext();
result = null;
}
};
- response.flushBuffer();
+ response.endResponse();
Assertions.assertTrue(runOnContextInvoked);
}
diff --git a/swagger/swagger-generator/generator-core/pom.xml
b/swagger/swagger-generator/generator-core/pom.xml
index c0aa02036..4a5825800 100644
--- a/swagger/swagger-generator/generator-core/pom.xml
+++ b/swagger/swagger-generator/generator-core/pom.xml
@@ -51,6 +51,10 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git
a/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java
b/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java
new file mode 100644
index 000000000..2f59fcb64
--- /dev/null
+++
b/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.swagger.generator.core.processor.response;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+import org.apache.servicecomb.swagger.generator.OperationGenerator;
+import org.apache.servicecomb.swagger.generator.SwaggerGenerator;
+import org.apache.servicecomb.swagger.jakarta.ModelConvertersAdapterJakarta;
+import org.reactivestreams.Publisher;
+
+import io.swagger.models.Model;
+import jakarta.ws.rs.core.MediaType;
+
+public class PublisherProcessor extends DefaultResponseTypeProcessor {
+ protected static final List<String> EVENTS_PRODUCE =
List.of(MediaType.SERVER_SENT_EVENTS);
+
+ public PublisherProcessor() {
+ extractActualType = true;
+ }
+
+ @Override
+ public Type getProcessType() {
+ return Publisher.class;
+ }
+
+ @Override
+ public Model process(SwaggerGenerator swaggerGenerator, OperationGenerator
operationGenerator,
+ Type genericResponseType) {
+
ModelConvertersAdapterJakarta.getInstance().addClassToSkip(Publisher.class.getName());
+ operationGenerator.getOperation().produces(EVENTS_PRODUCE);
+ return super.process(swaggerGenerator, operationGenerator,
genericResponseType);
+ }
+}
diff --git
a/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
b/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
index 53678b702..9abf37e8c 100644
---
a/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
+++
b/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
@@ -17,3 +17,4 @@
org.apache.servicecomb.swagger.generator.core.processor.response.CompletableFutureProcessor
org.apache.servicecomb.swagger.generator.core.processor.response.OptionalProcessor
+org.apache.servicecomb.swagger.generator.core.processor.response.PublisherProcessor
diff --git a/swagger/swagger-invocation/invocation-core/pom.xml
b/swagger/swagger-invocation/invocation-core/pom.xml
index 1680a0044..3bb762136 100644
--- a/swagger/swagger-invocation/invocation-core/pom.xml
+++ b/swagger/swagger-invocation/invocation-core/pom.xml
@@ -31,6 +31,14 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>swagger-generator-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-rx-java3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-core</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java
similarity index 51%
copy from
foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
copy to
swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java
index 265ee658e..636af6558 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java
@@ -14,27 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.servicecomb.swagger.invocation.response.consumer;
-package org.apache.servicecomb.foundation.vertx.http;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
-import java.util.concurrent.CompletableFuture;
+import io.reactivex.rxjava3.core.Flowable;
-import jakarta.servlet.http.HttpServletResponse;
-import jakarta.servlet.http.Part;
-import jakarta.ws.rs.core.Response.StatusType;
+public class PublisherConsumerResponseMapper implements ConsumerResponseMapper
{
+ private final boolean shouldExtractEntity;
-import io.vertx.core.http.HttpHeaders;
-
-public interface HttpServletResponseEx extends HttpServletResponse,
BodyBufferSupport {
- StatusType getStatusType();
-
- void setAttribute(String key, Object value);
-
- Object getAttribute(String key);
-
- CompletableFuture<Void> sendPart(Part body);
+ public PublisherConsumerResponseMapper(boolean shouldExtractEntity) {
+ this.shouldExtractEntity = shouldExtractEntity;
+ }
- default void setChunked(boolean chunked) {
- setHeader(HttpHeaders.TRANSFER_ENCODING.toString(),
HttpHeaders.CHUNKED.toString());
+ @Override
+ public Object mapResponse(Response response) {
+ Flowable<SseEventResponseEntity<?>> flowable = response.getResult();
+ if (shouldExtractEntity) {
+ return flowable.concatMap(entity ->
Flowable.fromIterable(entity.getData()));
+ }
+ return flowable;
}
}
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java
new file mode 100644
index 000000000..e7dd6d63c
--- /dev/null
+++
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.swagger.invocation.response.consumer;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import
org.apache.servicecomb.swagger.invocation.response.ResponseMapperFactorys;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
+
+public class PublisherConsumerResponseMapperFactory implements
ConsumerResponseMapperFactory {
+ @Override
+ public boolean isMatch(Type consumerType) {
+ if (!ParameterizedType.class.isAssignableFrom(consumerType.getClass())) {
+ return false;
+ }
+
+ return ((ParameterizedType)
consumerType).getRawType().equals(Publisher.class);
+ }
+
+ @Override
+ public ConsumerResponseMapper
createResponseMapper(ResponseMapperFactorys<ConsumerResponseMapper> factorys,
+ Type consumerType) {
+ Type realConsumerType = ((ParameterizedType)
consumerType).getActualTypeArguments()[0];
+ return new PublisherConsumerResponseMapper(
+
!realConsumerType.getTypeName().contains(SseEventResponseEntity.class.getName()));
+ }
+}
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
new file mode 100644
index 000000000..c5f56bb53
--- /dev/null
+++
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.swagger.invocation.response.producer;
+
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+
+import io.reactivex.rxjava3.core.Flowable;
+import jakarta.ws.rs.core.Response.StatusType;
+
+public class PublisherProducerResponseMapper implements ProducerResponseMapper
{
+ private final boolean shouldConstructEntity;
+
+ public PublisherProducerResponseMapper(boolean shouldConstructEntity) {
+ this.shouldConstructEntity = shouldConstructEntity;
+ }
+
+ @Override
+ public Response mapResponse(StatusType status, Object result) {
+ if (shouldConstructEntity) {
+ Flowable<SseEventResponseEntity<?>> responseEntity = ((Flowable<?>)
result).map(obj ->
+ new SseEventResponseEntity<>()
+ .data(obj));
+ return Response.create(status, responseEntity);
+ }
+ return Response.create(status, result);
+ }
+}
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java
new file mode 100644
index 000000000..9b35751f4
--- /dev/null
+++
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.swagger.invocation.response.producer;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import
org.apache.servicecomb.swagger.invocation.response.ResponseMapperFactorys;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
+
+public class PublisherProducerResponseMapperFactory implements
ProducerResponseMapperFactory {
+ @Override
+ public boolean isMatch(Type producerType) {
+ if (!ParameterizedType.class.isAssignableFrom(producerType.getClass())) {
+ return false;
+ }
+
+ return ((ParameterizedType)
producerType).getRawType().equals(Publisher.class);
+ }
+
+ @Override
+ public ProducerResponseMapper
createResponseMapper(ResponseMapperFactorys<ProducerResponseMapper> factorys,
+ Type providerType) {
+ Type realProducerType = ((ParameterizedType)
providerType).getActualTypeArguments()[0];
+ return new PublisherProducerResponseMapper(
+
!realProducerType.getTypeName().contains(SseEventResponseEntity.class.getName()));
+ }
+}
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java
new file mode 100644
index 000000000..923949de6
--- /dev/null
+++
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.swagger.invocation.sse;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import io.swagger.annotations.ApiModelProperty;
+
+public class SseEventResponseEntity<T> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SseEventResponseEntity.class);
+
+ /**
+ * event id
+ */
+ private Integer id;
+
+ /**
+ * event type
+ */
+ private String event;
+
+ /**
+ * reconnection time
+ */
+ private Long retry;
+
+ /**
+ * business data
+ */
+ private List<T> datas = new ArrayList<>();
+
+ public SseEventResponseEntity<T> id(int id) {
+ if (this.id != null) {
+ LOGGER.warn("origin id: [{}] is exists, overridden by the current value:
[{}]", this.id, id);
+ }
+ this.id = id;
+ return this;
+ }
+
+ public SseEventResponseEntity<T> event(String event) {
+ if (!StringUtils.isEmpty(this.event)) {
+ LOGGER.warn("origin event: [{}] is exists, overridden by the current
value: [{}]", this.event, event);
+ }
+ this.event = event;
+ return this;
+ }
+
+ public SseEventResponseEntity<T> retry(long retry) {
+ if (this.retry != null) {
+ LOGGER.warn("origin retry: [{}] is exists, overridden by the current
value: [{}]", this.retry, retry);
+ }
+ this.retry = retry;
+ return this;
+ }
+
+ public SseEventResponseEntity<T> data(T data) {
+ if (data == null) {
+ LOGGER.warn("The data content cannot be null!");
+ } else {
+ datas.add(data);
+ }
+ return this;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public String getEvent() {
+ return event;
+ }
+
+ public Long getRetry() {
+ return retry;
+ }
+
+ public List<T> getData() {
+ return datas;
+ }
+
+ @JsonIgnore
+ @ApiModelProperty(hidden = true)
+ public boolean isEmpty() {
+ return id == null
+ && event == null
+ && retry == null
+ && datas.isEmpty();
+ }
+}
diff --git
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java
similarity index 55%
copy from
foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
copy to
swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java
index 265ee658e..a72161cf2 100644
---
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java
@@ -14,27 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.servicecomb.swagger.invocation.sse;
-package org.apache.servicecomb.foundation.vertx.http;
+import
org.apache.servicecomb.swagger.generator.core.processor.response.DefaultResponseTypeProcessor;
-import java.util.concurrent.CompletableFuture;
-
-import jakarta.servlet.http.HttpServletResponse;
-import jakarta.servlet.http.Part;
-import jakarta.ws.rs.core.Response.StatusType;
-
-import io.vertx.core.http.HttpHeaders;
-
-public interface HttpServletResponseEx extends HttpServletResponse,
BodyBufferSupport {
- StatusType getStatusType();
-
- void setAttribute(String key, Object value);
-
- Object getAttribute(String key);
-
- CompletableFuture<Void> sendPart(Part body);
+public class SseEventResponseEntityProcessor extends
DefaultResponseTypeProcessor {
+ public SseEventResponseEntityProcessor() {
+ extractActualType = true;
+ }
- default void setChunked(boolean chunked) {
- setHeader(HttpHeaders.TRANSFER_ENCODING.toString(),
HttpHeaders.CHUNKED.toString());
+ @Override
+ public Class<?> getProcessType() {
+ return SseEventResponseEntity.class;
}
}
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
index bb2abfe50..965674ee0 100644
---
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
+++
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor
@@ -17,3 +17,4 @@
org.apache.servicecomb.swagger.invocation.generator.ScbResponseProcessor
org.apache.servicecomb.swagger.invocation.ws.ServerWebSocketResponseProcessor
+org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntityProcessor
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
index 0474058a9..5092ed6b3 100644
---
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
+++
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory
@@ -19,3 +19,4 @@
org.apache.servicecomb.swagger.invocation.response.consumer.CseResponseConsumerR
org.apache.servicecomb.swagger.invocation.response.consumer.CompletableFutureConsumerResponseMapperFactory
org.apache.servicecomb.swagger.invocation.response.consumer.DefaultConsumerResponseMapperFactory
org.apache.servicecomb.swagger.invocation.response.consumer.OptionalConsumerResponseMapperFactory
+org.apache.servicecomb.swagger.invocation.response.consumer.PublisherConsumerResponseMapperFactory
\ No newline at end of file
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
index dac2a90e3..bf675cf96 100644
---
a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
+++
b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory
@@ -19,3 +19,4 @@
org.apache.servicecomb.swagger.invocation.response.producer.CseResponseProducerR
org.apache.servicecomb.swagger.invocation.response.producer.CompletableFutureProducerResponseMapperFactory
org.apache.servicecomb.swagger.invocation.response.producer.DefaultProducerResponseMapperFactory
org.apache.servicecomb.swagger.invocation.response.producer.OptionalProducerResponseMapperFactory
+org.apache.servicecomb.swagger.invocation.response.producer.PublisherProducerResponseMapperFactory
\ No newline at end of file
diff --git
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
index 36e0af598..f973f9e1c 100644
---
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
+++
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
@@ -58,6 +58,6 @@ public class HighwayProducerInvocationFlow extends
ProducerInvocationFlow {
@Override
protected void sendResponse(Invocation invocation, Response response) {
HighwayTransportContext transportContext =
invocation.getTransportContext();
- connection.write(transportContext.getResponseBuffer().getByteBuf());
+ connection.write(transportContext.getResponseBuffer());
}
}
diff --git
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 98fab6362..0c9e8e79b 100644
---
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -153,7 +153,7 @@ public class HighwayServerInvoke {
respBuffer = HighwayCodec.encodeResponse(msgId, header, bodySchema,
response.getResult());
}
invocation.getInvocationStageTrace().finishServerFiltersResponse();
- connection.write(respBuffer.getByteBuf());
+ connection.write(respBuffer);
} catch (Exception e) {
// keep highway performance and simple, this encoding/decoding error not
need handle by client
String msg = String.format("encode response failed, %s, msgId=%d",
diff --git a/transports/transport-rest/transport-rest-client/pom.xml
b/transports/transport-rest/transport-rest-client/pom.xml
index 0a22aaf8c..c2265042a 100644
--- a/transports/transport-rest/transport-rest-client/pom.xml
+++ b/transports/transport-rest/transport-rest-client/pom.xml
@@ -40,6 +40,10 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>common-rest</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-rx-java3</artifactId>
+ </dependency>
<dependency>
<groupId>io.vertx</groupId>
diff --git
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
index af5ee1d10..7e37ebc9a 100644
---
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
+++
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
@@ -18,11 +18,14 @@
package org.apache.servicecomb.transport.rest.client.http;
import java.util.Collection;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import jakarta.ws.rs.core.HttpHeaders;
+import jakarta.ws.rs.core.MediaType;
import org.apache.servicecomb.common.rest.RestConst;
+import
org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
import
org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
@@ -35,12 +38,17 @@ import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JavaType;
import com.netflix.config.DynamicPropertyFactory;
+import io.reactivex.rxjava3.core.Flowable;
+import io.vertx.core.buffer.Buffer;
+
public class DefaultHttpClientFilter implements HttpClientFilter {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultHttpClientFilter.class);
@@ -76,6 +84,7 @@ public class DefaultHttpClientFilter implements
HttpClientFilter {
if (idx != -1) {
contentTypeForFind = contentType.substring(0, idx);
}
+
return restOperation.findProduceProcessor(contentTypeForFind);
}
@@ -84,12 +93,11 @@ public class DefaultHttpClientFilter implements
HttpClientFilter {
if (result != null) {
return Response.create(responseEx.getStatusType(), result);
}
-
OperationMeta operationMeta = invocation.getOperationMeta();
JavaType responseType =
invocation.findResponseType(responseEx.getStatus());
RestOperationMeta swaggerRestOperation =
operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION);
ProduceProcessor produceProcessor =
findProduceProcessor(swaggerRestOperation, responseEx);
- if (produceProcessor == null) {
+ if (produceProcessor == null && !isEventStream(responseEx)) {
// This happens outside the runtime such as Servlet filter response.
Here we give a default json parser to it
// and keep user data not get lose.
LOGGER.warn("Response content-type {} is not supported. Method {}, path
{}, statusCode {}, reasonPhrase {}.",
@@ -103,7 +111,16 @@ public class DefaultHttpClientFilter implements
HttpClientFilter {
}
try {
- result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(),
responseType);
+ if (responseEx.getFlowableBuffer() == null) {
+ result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(),
responseType);
+ } else {
+ Flowable<Buffer> flowable = responseEx.getFlowableBuffer();
+ ProduceEventStreamProcessor finalProduceProcessor = new
ProduceEventStreamProcessor();
+ result = flowable.concatMap(buffer ->
extractFlowableBody(finalProduceProcessor, responseType, buffer))
+ .doFinally(finalProduceProcessor::close)
+ .doOnCancel(finalProduceProcessor::close)
+ .filter(Objects::nonNull);
+ }
Response response = Response.create(responseEx.getStatusType(), result);
if (response.isFailed()) {
LOGGER.warn("invoke operation [{}] failed, status={}, msg={}",
invocation.getMicroserviceQualifiedName(),
@@ -130,6 +147,16 @@ public class DefaultHttpClientFilter implements
HttpClientFilter {
}
}
+ private boolean isEventStream(HttpServletResponseEx responseEx) {
+ return responseEx.getHeader(HttpHeaders.CONTENT_TYPE) != null
+ &&
responseEx.getHeader(HttpHeaders.CONTENT_TYPE).contains(MediaType.SERVER_SENT_EVENTS);
+ }
+
+ protected Publisher<SseEventResponseEntity<?>>
extractFlowableBody(ProduceEventStreamProcessor produceProcessor,
+ JavaType responseType, Buffer buffer) throws Exception {
+ return produceProcessor.decodeResponse(buffer, responseType);
+ }
+
@Override
public Response afterReceiveResponse(Invocation invocation,
HttpServletResponseEx responseEx) {
Response response = extractResponse(invocation, responseEx);
diff --git
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index f749f1173..1cb7c20e7 100644
---
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
+import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
@@ -53,6 +54,7 @@ import
org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
@@ -60,6 +62,7 @@ import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
+import io.vertx.rxjava3.FlowableHelper;
public class RestClientInvocation {
private static final Logger LOGGER =
LoggerFactory.getLogger(RestClientInvocation.class);
@@ -87,6 +90,8 @@ public class RestClientInvocation {
private boolean alreadyFailed = false;
+ protected static final String EVENTS_MEDIA_TYPE =
MediaType.SERVER_SENT_EVENTS;
+
public RestClientInvocation(HttpClientWithContext httpClientWithContext,
List<HttpClientFilter> httpClientFilters) {
this.httpClientWithContext = httpClientWithContext;
this.httpClientFilters = httpClientFilters;
@@ -218,6 +223,11 @@ public class RestClientInvocation {
return;
}
+ if (isServerSendEvents(httpClientResponse)) {
+
processFlowableResponseBody(FlowableHelper.toFlowable(httpClientResponse));
+ return;
+ }
+
httpClientResponse.exceptionHandler(e -> {
invocation.getTraceIdLogger().error(LOGGER, "Failed to receive response,
local:{}, remote:{}, message={}.",
getLocalAddress(), httpClientResponse.netSocket().remoteAddress(),
@@ -228,17 +238,38 @@ public class RestClientInvocation {
clientResponse.bodyHandler(this::processResponseBody);
}
+ private boolean isServerSendEvents(HttpClientResponse httpClientResponse) {
+ if (httpClientResponse.getHeader("Content-Type") == null) {
+ return false;
+ }
+ return
httpClientResponse.getHeader("Content-Type").contains(EVENTS_MEDIA_TYPE);
+ }
+
/**
* after this method, connection will be recycled to connection pool
* @param responseBuf response body buffer, when download, responseBuf is
null, because download data by ReadStreamPart
*/
protected void processResponseBody(Buffer responseBuf) {
+ HttpServletResponseEx responseEx =
+ new VertxClientResponseToHttpServletResponse(clientResponse,
responseBuf);
+ doProcessResponseBody(responseEx);
+ }
+
+ /**
+ * after this method, connection will be recycled to connection pool
+ * @param flowable sse flowable response
+ */
+ protected void processFlowableResponseBody(Flowable<Buffer> flowable) {
+ HttpServletResponseEx responseEx =
+ new VertxClientResponseToHttpServletResponse(clientResponse, flowable);
+ doProcessResponseBody(responseEx);
+ }
+
+ private void doProcessResponseBody(HttpServletResponseEx responseEx) {
invocation.getInvocationStageTrace().finishReceiveResponse();
invocation.getResponseExecutor().execute(() -> {
try {
invocation.getInvocationStageTrace().startClientFiltersResponse();
- HttpServletResponseEx responseEx =
- new VertxClientResponseToHttpServletResponse(clientResponse,
responseBuf);
for (HttpClientFilter filter : httpClientFilters) {
if (filter.enabled()) {
Response response = filter.afterReceiveResponse(invocation,
responseEx);
diff --git
a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
index 5c0b891e4..2b5192a54 100644
---
a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
+++
b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java
@@ -141,6 +141,8 @@ public class TestDefaultHttpClientFilter {
result = 400;
responseEx.getBodyBuffer();
result = new BufferImpl().appendString("abc");
+ responseEx.getFlowableBuffer();
+ result = null;
}
};
new MockUp<DefaultHttpClientFilter>() {
@@ -189,6 +191,8 @@ public class TestDefaultHttpClientFilter {
result = 200;
responseEx.getBodyBuffer();
result = new BufferImpl().appendString("abc");
+ responseEx.getFlowableBuffer();
+ result = null;
}
};
new MockUp<DefaultHttpClientFilter>() {
@@ -237,6 +241,8 @@ public class TestDefaultHttpClientFilter {
result = Status.FORBIDDEN;
responseEx.getBodyBuffer();
result = Buffer.buffer(JsonUtils.writeValueAsString(data).getBytes());
+ responseEx.getFlowableBuffer();
+ result = null;
}
};
@@ -284,6 +290,8 @@ public class TestDefaultHttpClientFilter {
responseEx.getStatusType();
result = Status.OK;
+ responseEx.getFlowableBuffer();
+ result = null;
}
};