This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 32bb460aa00e CAMEL-23804: Fix SSE streaming on 
camel-platform-http-vertx
32bb460aa00e is described below

commit 32bb460aa00ef206d18b51d67bf649244820b473
Author: James Netherton <[email protected]>
AuthorDate: Fri Jun 19 16:02:14 2026 +0100

    CAMEL-23804: Fix SSE streaming on camel-platform-http-vertx
    
    AsyncInputStream greedily filled its 16KB buffer before emitting data,
    which blocked SSE events behind the next queue poll. Add an eagerFlush
    mode that emits each chunk immediately, and activate it automatically
    for text/event-stream responses. Also slice the output buffer to the
    actual bytes written to prevent trailing zeroes on partial reads.
    
    Update camel-a2a docs to remove the Vert.x SSE streaming limitation
    now that platform-http delivers events eagerly.
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../camel-a2a/src/main/docs/a2a-component.adoc     |  22 +---
 .../platform/http/vertx/AsyncInputStream.java      |  18 ++-
 .../http/vertx/VertxPlatformHttpSupport.java       |   6 +-
 .../http/vertx/VertxPlatformHttpSseTest.java       | 138 +++++++++++++++++++++
 4 files changed, 162 insertions(+), 22 deletions(-)

diff --git a/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc 
b/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc
index 03c08d7ca30f..4515f9949323 100644
--- a/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc
+++ b/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc
@@ -36,7 +36,7 @@ Current Preview limitations:
 * Extended Agent Card / `GetExtendedAgentCard` is not implemented in this 
first release.
 * Consumers validate operation requests by default. Local-only examples in 
this page set `validateAuth=false`; network-exposed agents should use an agent 
card with `securitySchemes` and `securityRequirements` plus `apiKey`, 
`bearerToken`, or `oauthProfile` endpoint configuration.
 * The REST binding uses A2A custom-method paths containing colons, such as 
`/message:send`. These paths collide on Vert.x/platform-http. Use 
`protocolBinding=JSONRPC` with platform-http, or use 
`httpServerComponent=undertow` or `httpServerComponent=jetty` for REST 
custom-method routes.
-* Real-time SSE streaming is verified with Undertow and Jetty. 
Vert.x/platform-http buffers `InputStream` responses and does not deliver 
events as they are emitted.
+* Real-time SSE streaming is verified with Vert.x/platform-http, Undertow, and 
Jetty.
 * The default task store is in-memory and single-JVM. Register a custom 
`A2ATaskStore` for durable or cross-node task state.
 
 Maven users will need to add the following dependency to their `pom.xml`:
@@ -51,7 +51,7 @@ Maven users will need to add the following dependency to 
their `pom.xml`:
 </dependency>
 ----
 
-Consumer routes also need an HTTP server component at runtime. For example, 
add `camel-platform-http-vertx` for JSON-RPC agents, or `camel-undertow` / 
`camel-jetty` for REST custom-method routes and real-time SSE streaming. See 
<<Dependencies>> and <<HTTP Server Component Discovery>>.
+Consumer routes also need an HTTP server component at runtime. For example, 
add `camel-platform-http-vertx` for JSON-RPC agents (including SSE streaming), 
or `camel-undertow` / `camel-jetty` for REST custom-method routes. See 
<<Dependencies>> and <<HTTP Server Component Discovery>>.
 
 == URI Format
 
@@ -249,7 +249,7 @@ The A2A REST protocol uses Google's Custom Method 
convention with colons in path
 
 This means operation serving is not reliable with REST binding on 
Vert.x/platform-http. For example, `POST /message:stream` can be handled by the 
`SendMessage` route instead of the streaming route.
 
-*Workaround:* Use `protocolBinding=JSONRPC` for agents running on Vert.x (the 
default `platform-http`). Alternatively, use `httpServerComponent=jetty` or 
`httpServerComponent=undertow`; their routers treat colons as literal 
characters. For real-time SSE streaming, use Jetty or Undertow because 
Vert.x/platform-http buffers `InputStream` responses.
+*Workaround:* Use `protocolBinding=JSONRPC` for agents running on Vert.x (the 
default `platform-http`). Alternatively, use `httpServerComponent=jetty` or 
`httpServerComponent=undertow`; their routers treat colons as literal 
characters.
 
 === Authentication
 
@@ -559,19 +559,11 @@ The consumer supports two SSE streaming patterns:
 * *`SendStreamingMessage`* — The stream starts with the submitted `Task`, 
followed by progress events from `A2AProgress` or `${a2a:emit()}`. When the 
route completes, the body is emitted as the final message.
 * *`SubscribeToTask`* — Clients subscribe via `POST /tasks/\{id}:subscribe` to 
receive real-time SSE updates for an existing non-terminal task. The component 
uses an Event-to-Stream Bridge (`QueueStreamEmitter` + `SseQueueInputStream`) 
with heartbeat comments to keep the connection alive. The stream sends the 
current `Task` first and ends when the task later reaches a terminal state. 
Subscribing to an already-terminal task returns `UnsupportedOperationError`.
 
-[IMPORTANT]
+[NOTE]
 ====
 *HTTP Server Component for Streaming*
 
-For real-time SSE streaming, use `httpServerComponent=undertow` or 
`httpServerComponent=jetty`. The default Vert.x `platform-http` buffers 
`InputStream` responses due to an `AsyncInputStream` greedy-fill loop, causing 
all events to arrive at once.
-
-[source,yaml]
-----
-parameters:
-  httpServerComponent: undertow
-----
-
-This requires `camel-undertow` (or `camel-jetty`) on the classpath.
+Real-time SSE streaming works with all tested HTTP server components: 
Vert.x/platform-http, Undertow, and Jetty.
 ====
 
 SSE streaming parameters:
@@ -1654,11 +1646,9 @@ The preview component does not auto-create arbitrary 
HTTP server components and
 
 Tested behavior in this Preview release:
 
-* `camel-platform-http` / Vert.x - suitable for JSON-RPC agents and serving 
the public agent card. REST custom-method paths containing colons can collide, 
and SSE `InputStream` responses are buffered.
+* `camel-platform-http` / Vert.x - suitable for JSON-RPC agents (including SSE 
streaming) and serving the public agent card. REST custom-method paths 
containing colons can collide.
 * `camel-undertow` and `camel-jetty` - suitable for REST custom-method paths 
and real-time SSE streaming.
 
-For SSE streaming, specify `httpServerComponent=undertow` or 
`httpServerComponent=jetty` and add the matching component dependency.
-
 == Data Format
 
 The `dataFormat` parameter controls what the exchange body contains, following 
the same convention as the xref:cxf-component.adoc[CXF component]. It applies 
to both consumer (inbound request) and producer (response) sides.
diff --git 
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
 
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
index b2ea643b08d2..17825ef417d3 100644
--- 
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
+++ 
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
@@ -48,6 +48,7 @@ public class AsyncInputStream implements ReadStream<Buffer> {
     private final Vertx vertx;
     private final Context context;
     private final InboundBuffer<Buffer> queue;
+    private final boolean eagerFlush;
     private long readPos;
     private boolean closed;
     private boolean readInProgress;
@@ -56,9 +57,14 @@ public class AsyncInputStream implements ReadStream<Buffer> {
     private Handler<Throwable> exceptionHandler;
 
     public AsyncInputStream(Vertx vertx, Context context, InputStream 
inputStream) {
+        this(vertx, context, inputStream, false);
+    }
+
+    public AsyncInputStream(Vertx vertx, Context context, InputStream 
inputStream, boolean eagerFlush) {
         this.vertx = vertx;
         this.context = context;
         this.channel = Channels.newChannel(inputStream);
+        this.eagerFlush = eagerFlush;
         this.queue = new InboundBuffer<>(context, 0);
         queue.handler(buffer -> {
             if (buffer.length() > 0) {
@@ -222,22 +228,24 @@ public class AsyncInputStream implements 
ReadStream<Buffer> {
                             // EOF
                             context.runOnContext((v) -> {
                                 buffer.flip();
+                                int written = offset + buffer.limit();
                                 writeBuff.setBytes(offset, buffer);
                                 buffer.compact();
-                                
handler.handle(Future.succeededFuture(writeBuff));
+                                
handler.handle(Future.succeededFuture(writeBuff.slice(0, written)));
                             });
-                        } else if (buffer.hasRemaining()) {
-                            // Read from the next offset
+                        } else if (!eagerFlush && buffer.hasRemaining()) {
+                            // Read from the next offset to fill the buffer
                             context.runOnContext((v) -> {
                                 doRead(writeBuff, offset, buffer, position + 
bytesRead, handler);
                             });
                         } else {
-                            // All data is written
+                            // Emit what we have (always for eagerFlush, or 
when buffer is full)
                             context.runOnContext((v) -> {
                                 buffer.flip();
+                                int written = offset + buffer.limit();
                                 writeBuff.setBytes(offset, buffer);
                                 buffer.compact();
-                                
handler.handle(Future.succeededFuture(writeBuff));
+                                
handler.handle(Future.succeededFuture(writeBuff.slice(0, written)));
                             });
                         }
                     } else {
diff --git 
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
 
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index 3c9eba48653b..b731f93fadd4 100644
--- 
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++ 
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -237,8 +237,12 @@ public final class VertxPlatformHttpSupport {
         Vertx vertx = ctx.vertx();
         Context context = vertx.getOrCreateContext();
 
+        // For SSE streams, flush each chunk immediately instead of greedily 
filling the buffer
+        String contentType = response.headers().get("Content-Type");
+        boolean eagerFlush = contentType != null && 
contentType.startsWith("text/event-stream");
+
         // Process the InputStream async to avoid blocking the Vert.x event 
loop on large responses
-        AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, 
context, is);
+        AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, 
context, is, eagerFlush);
         asyncInputStream.exceptionHandler(promise::fail);
         asyncInputStream.endHandler(event -> endHandler(promise, response, 
asyncInputStream));
 
diff --git 
a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSseTest.java
 
b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSseTest.java
new file mode 100644
index 000000000000..52ca8e7f147b
--- /dev/null
+++ 
b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSseTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpMethod;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class VertxPlatformHttpSseTest {
+
+    @Test
+    void testSseEventsDeliveredEagerly() throws Exception {
+        final CamelContext context = 
VertxPlatformHttpEngineTest.createCamelContext();
+
+        // Writer waits on this latch after sending event-1.
+        // If event-1 is flushed eagerly, the client reads it and counts down,
+        // unblocking the writer to send event-2.
+        // If events are batched, the client never sees event-1 until the 
stream
+        // closes, so the latch times out and the test fails.
+        CountDownLatch event1Received = new CountDownLatch(1);
+        AtomicReference<Throwable> writerError = new AtomicReference<>();
+
+        HttpClient client = null;
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/sse")
+                            .process(exchange -> {
+                                PipedOutputStream out = new 
PipedOutputStream();
+                                PipedInputStream in = new 
PipedInputStream(out);
+
+                                
exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/event-stream");
+                                exchange.getMessage().setBody(in);
+
+                                Thread writer = new Thread(() -> {
+                                    try {
+                                        out.write("data: 
event-1\n\n".getBytes(StandardCharsets.UTF_8));
+                                        out.flush();
+
+                                        assertTrue(event1Received.await(5, 
TimeUnit.SECONDS),
+                                                "Client did not receive 
event-1 before timeout — events may be buffered");
+
+                                        out.write("data: 
event-2\n\n".getBytes(StandardCharsets.UTF_8));
+                                        out.flush();
+                                        out.close();
+                                    } catch (Throwable e) {
+                                        writerError.set(e);
+                                        try {
+                                            out.close();
+                                        } catch (Exception ignored) {
+                                        }
+                                    }
+                                });
+                                writer.setDaemon(true);
+                                writer.start();
+                            });
+                }
+            });
+
+            VertxPlatformHttpEngineTest.startCamelContext(context);
+
+            VertxPlatformHttpServer server = 
context.hasService(VertxPlatformHttpServer.class);
+            client = server.getVertx().createHttpClient();
+
+            StringBuilder received = new StringBuilder();
+            CompletableFuture<Void> streamDone = new CompletableFuture<>();
+
+            client.request(HttpMethod.GET, server.getPort(), "localhost", 
"/sse")
+                    .onSuccess(request -> {
+                        request.response()
+                                .onSuccess(response -> {
+                                    assertEquals(200, response.statusCode());
+                                    assertEquals("text/event-stream", 
response.getHeader("Content-Type"));
+
+                                    response.handler(buffer -> {
+                                        String chunk = 
buffer.toString(StandardCharsets.UTF_8);
+                                        received.append(chunk);
+
+                                        if (chunk.contains("event-1")) {
+                                            event1Received.countDown();
+                                        }
+                                    });
+                                    response.endHandler(v -> 
streamDone.complete(null));
+                                    
response.exceptionHandler(streamDone::completeExceptionally);
+                                })
+                                .onFailure(streamDone::completeExceptionally);
+                        request.end();
+                    })
+                    .onFailure(streamDone::completeExceptionally);
+
+            streamDone.get(10, TimeUnit.SECONDS);
+
+            String[] events = received.toString().split("\n\n");
+            assertEquals(2, events.length);
+            assertEquals("data: event-1", events[0]);
+            assertEquals("data: event-2", events[1]);
+
+            Throwable error = writerError.get();
+            if (error != null) {
+                throw new AssertionError("Writer thread failed", error);
+            }
+        } finally {
+            if (client != null) {
+                
client.close().toCompletionStage().toCompletableFuture().get(5, 
TimeUnit.SECONDS);
+            }
+            context.stop();
+        }
+    }
+}

Reply via email to