This is an automated email from the ASF dual-hosted git repository.

youling1128 pushed a commit to branch 2.9.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/2.9.x by this push:
     new d92d9a40a [#4873] unified Flowable conversion to prevent internal 
typecasting exceptions. (#4909)
d92d9a40a is described below

commit d92d9a40ac98c4fcab972afa5f05f8f1f74d1360
Author: Alex <[email protected]>
AuthorDate: Fri Aug 22 17:13:05 2025 +0800

    [#4873] unified Flowable conversion to prevent internal typecasting 
exceptions. (#4909)
---
 .../rest/filter/inner/ServerRestArgsFilter.java    | 43 +++++++++-------------
 .../producer/PublisherProducerResponseMapper.java  | 11 ++++--
 2 files changed, 25 insertions(+), 29 deletions(-)

diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
index 281769d75..ee97dcce2 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
@@ -31,6 +31,7 @@ import 
org.apache.servicecomb.common.rest.definition.RestOperationMeta;
 import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.OperationMeta;
+import 
org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
 import org.apache.servicecomb.foundation.common.utils.PartUtils;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
@@ -126,19 +127,15 @@ public class ServerRestArgsFilter implements 
HttpServerFilter {
 
       @Override
       public void onNext(Object o) {
-        try {
-          writeResponse(responseEx, produceProcessor, o, 
response).whenComplete((r, e) -> {
-            if (e != null) {
-              subscription.cancel();
-              result.completeExceptionally(e);
-              return;
-            }
+        writeResponse(responseEx, produceProcessor, o, response).thenApply(r 
-> {
             subscription.request(1);
+            return r;
+          })
+          .exceptionally(e -> {
+            new SuppressedRunnableWrapper(() -> subscription.cancel()).run();
+            new SuppressedRunnableWrapper(() -> 
result.completeExceptionally(e)).run();
+            return response;
           });
-        } catch (Throwable e) {
-          LOGGER.warn("Failed to subscribe event: {}", o, e);
-          result.completeExceptionally(e);
-        }
       }
 
       @Override
@@ -158,22 +155,18 @@ public class ServerRestArgsFilter implements 
HttpServerFilter {
       HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, 
Object data, Response response) {
     try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
       produceProcessor.encodeResponse(output, data);
-      CompletableFuture<Response> result = new CompletableFuture<>();
-      responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> {
-        if (e != null) {
-          result.completeExceptionally(e);
-        }
-        try {
-          responseEx.flushBuffer();
-        } catch (IOException ex) {
-          LOGGER.warn("Failed to flush buffer for Server Send Events", ex);
-        }
-      });
-      result.complete(response);
-      return result;
+      return responseEx.sendBuffer(output.getBuffer())
+          .thenApply(v -> {
+            try {
+              responseEx.flushBuffer();
+              return response;
+            } catch (IOException e) {
+              LOGGER.warn("Failed to flush buffer for Server Send Events", e);
+              throw new IllegalStateException("Failed to flush buffer for 
Server Send Events", e);
+            }
+          });
     } catch (Throwable e) {
       LOGGER.error("internal service error must be fixed.", e);
-      responseEx.setStatus(500);
       return CompletableFuture.failedFuture(e);
     }
   }
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
index c5f56bb53..c8ec40ebb 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java
@@ -18,6 +18,7 @@ package 
org.apache.servicecomb.swagger.invocation.response.producer;
 
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
+import org.reactivestreams.Publisher;
 
 import io.reactivex.rxjava3.core.Flowable;
 import jakarta.ws.rs.core.Response.StatusType;
@@ -31,12 +32,14 @@ public class PublisherProducerResponseMapper implements 
ProducerResponseMapper {
 
   @Override
   public Response mapResponse(StatusType status, Object result) {
+    // Unified Flowable conversion to prevent internal typecasting exceptions.
+    final Flowable<?> flowableResult = result instanceof Flowable ?
+            (Flowable<?>) result : Flowable.fromPublisher(((Publisher<?>) 
result));
     if (shouldConstructEntity) {
-      Flowable<SseEventResponseEntity<?>> responseEntity = ((Flowable<?>) 
result).map(obj ->
-          new SseEventResponseEntity<>()
-              .data(obj));
+      Flowable<SseEventResponseEntity<?>> responseEntity = flowableResult
+          .map(obj -> new SseEventResponseEntity<>().data(obj));
       return Response.create(status, responseEntity);
     }
-    return Response.create(status, result);
+    return Response.create(status, flowableResult);
   }
 }

Reply via email to