This is an automated email from the ASF dual-hosted git repository.
youling1128 pushed a commit to branch 2.9.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/2.9.x by this push:
new d92d9a40a [#4873] unified Flowable conversion to prevent internal
typecasting exceptions. (#4909)
d92d9a40a is described below
commit d92d9a40ac98c4fcab972afa5f05f8f1f74d1360
Author: Alex <[email protected]>
AuthorDate: Fri Aug 22 17:13:05 2025 +0800
[#4873] unified Flowable conversion to prevent internal typecasting
exceptions. (#4909)
---
.../rest/filter/inner/ServerRestArgsFilter.java | 43 +++++++++-------------
.../producer/PublisherProducerResponseMapper.java | 11 ++++--
2 files changed, 25 insertions(+), 29 deletions(-)
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
index 281769d75..ee97dcce2 100644
---
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
@@ -31,6 +31,7 @@ import
org.apache.servicecomb.common.rest.definition.RestOperationMeta;
import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.OperationMeta;
+import
org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
import org.apache.servicecomb.foundation.common.utils.PartUtils;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
@@ -126,19 +127,15 @@ public class ServerRestArgsFilter implements
HttpServerFilter {
@Override
public void onNext(Object o) {
- try {
- writeResponse(responseEx, produceProcessor, o,
response).whenComplete((r, e) -> {
- if (e != null) {
- subscription.cancel();
- result.completeExceptionally(e);
- return;
- }
+ writeResponse(responseEx, produceProcessor, o, response).thenApply(r
-> {
subscription.request(1);
+ return r;
+ })
+ .exceptionally(e -> {
+ new SuppressedRunnableWrapper(() -> subscription.cancel()).run();
+ new SuppressedRunnableWrapper(() ->
result.completeExceptionally(e)).run();
+ return response;
});
- } catch (Throwable e) {
- LOGGER.warn("Failed to subscribe event: {}", o, e);
- result.completeExceptionally(e);
- }
}
@Override
@@ -158,22 +155,18 @@ public class ServerRestArgsFilter implements
HttpServerFilter {
HttpServletResponseEx responseEx, ProduceProcessor produceProcessor,
Object data, Response response) {
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
produceProcessor.encodeResponse(output, data);
- CompletableFuture<Response> result = new CompletableFuture<>();
- responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> {
- if (e != null) {
- result.completeExceptionally(e);
- }
- try {
- responseEx.flushBuffer();
- } catch (IOException ex) {
- LOGGER.warn("Failed to flush buffer for Server Send Events", ex);
- }
- });
- result.complete(response);
- return result;
+ return responseEx.sendBuffer(output.getBuffer())
+ .thenApply(v -> {
+ try {
+ responseEx.flushBuffer();
+ return response;
+ } catch (IOException e) {
+ LOGGER.warn("Failed to flush buffer for Server Send Events", e);
+ throw new IllegalStateException("Failed to flush buffer for
Server Send Events", e);
+ }
+ });
} catch (Throwable e) {
LOGGER.error("internal service error must be fixed.", e);
- responseEx.setStatus(500);
return CompletableFuture.failedFuture(e);
}
}
diff --git
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
index c5f56bb53..c8ec40ebb 100644
---
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
+++
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
@@ -18,6 +18,7 @@ package
org.apache.servicecomb.swagger.invocation.response.producer;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.Flowable;
import jakarta.ws.rs.core.Response.StatusType;
@@ -31,12 +32,14 @@ public class PublisherProducerResponseMapper implements
ProducerResponseMapper {
@Override
public Response mapResponse(StatusType status, Object result) {
+ // Unified Flowable conversion to prevent internal typecasting exceptions.
+ final Flowable<?> flowableResult = result instanceof Flowable ?
+ (Flowable<?>) result : Flowable.fromPublisher(((Publisher<?>)
result));
if (shouldConstructEntity) {
- Flowable<SseEventResponseEntity<?>> responseEntity = ((Flowable<?>)
result).map(obj ->
- new SseEventResponseEntity<>()
- .data(obj));
+ Flowable<SseEventResponseEntity<?>> responseEntity = flowableResult
+ .map(obj -> new SseEventResponseEntity<>().data(obj));
return Response.create(status, responseEntity);
}
- return Response.create(status, result);
+ return Response.create(status, flowableResult);
}
}