This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 32bb460aa00e CAMEL-23804: Fix SSE streaming on
camel-platform-http-vertx
32bb460aa00e is described below
commit 32bb460aa00ef206d18b51d67bf649244820b473
Author: James Netherton <[email protected]>
AuthorDate: Fri Jun 19 16:02:14 2026 +0100
CAMEL-23804: Fix SSE streaming on camel-platform-http-vertx
AsyncInputStream greedily filled its 16KB buffer before emitting data,
which blocked SSE events behind the next queue poll. Add an eagerFlush
mode that emits each chunk immediately, and activate it automatically
for text/event-stream responses. Also slice the output buffer to the
actual bytes written to prevent trailing zeroes on partial reads.
Update camel-a2a docs to remove the Vert.x SSE streaming limitation
now that platform-http delivers events eagerly.
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../camel-a2a/src/main/docs/a2a-component.adoc | 22 +---
.../platform/http/vertx/AsyncInputStream.java | 18 ++-
.../http/vertx/VertxPlatformHttpSupport.java | 6 +-
.../http/vertx/VertxPlatformHttpSseTest.java | 138 +++++++++++++++++++++
4 files changed, 162 insertions(+), 22 deletions(-)
diff --git a/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc
b/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc
index 03c08d7ca30f..4515f9949323 100644
--- a/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc
+++ b/components/camel-ai/camel-a2a/src/main/docs/a2a-component.adoc
@@ -36,7 +36,7 @@ Current Preview limitations:
* Extended Agent Card / `GetExtendedAgentCard` is not implemented in this
first release.
* Consumers validate operation requests by default. Local-only examples in
this page set `validateAuth=false`; network-exposed agents should use an agent
card with `securitySchemes` and `securityRequirements` plus `apiKey`,
`bearerToken`, or `oauthProfile` endpoint configuration.
* The REST binding uses A2A custom-method paths containing colons, such as
`/message:send`. These paths collide on Vert.x/platform-http. Use
`protocolBinding=JSONRPC` with platform-http, or use
`httpServerComponent=undertow` or `httpServerComponent=jetty` for REST
custom-method routes.
-* Real-time SSE streaming is verified with Undertow and Jetty.
Vert.x/platform-http buffers `InputStream` responses and does not deliver
events as they are emitted.
+* Real-time SSE streaming is verified with Vert.x/platform-http, Undertow, and
Jetty.
* The default task store is in-memory and single-JVM. Register a custom
`A2ATaskStore` for durable or cross-node task state.
Maven users will need to add the following dependency to their `pom.xml`:
@@ -51,7 +51,7 @@ Maven users will need to add the following dependency to
their `pom.xml`:
</dependency>
----
-Consumer routes also need an HTTP server component at runtime. For example,
add `camel-platform-http-vertx` for JSON-RPC agents, or `camel-undertow` /
`camel-jetty` for REST custom-method routes and real-time SSE streaming. See
<<Dependencies>> and <<HTTP Server Component Discovery>>.
+Consumer routes also need an HTTP server component at runtime. For example,
add `camel-platform-http-vertx` for JSON-RPC agents (including SSE streaming),
or `camel-undertow` / `camel-jetty` for REST custom-method routes. See
<<Dependencies>> and <<HTTP Server Component Discovery>>.
== URI Format
@@ -249,7 +249,7 @@ The A2A REST protocol uses Google's Custom Method
convention with colons in path
This means operation serving is not reliable with REST binding on
Vert.x/platform-http. For example, `POST /message:stream` can be handled by the
`SendMessage` route instead of the streaming route.
-*Workaround:* Use `protocolBinding=JSONRPC` for agents running on Vert.x (the
default `platform-http`). Alternatively, use `httpServerComponent=jetty` or
`httpServerComponent=undertow`; their routers treat colons as literal
characters. For real-time SSE streaming, use Jetty or Undertow because
Vert.x/platform-http buffers `InputStream` responses.
+*Workaround:* Use `protocolBinding=JSONRPC` for agents running on Vert.x (the
default `platform-http`). Alternatively, use `httpServerComponent=jetty` or
`httpServerComponent=undertow`; their routers treat colons as literal
characters.
=== Authentication
@@ -559,19 +559,11 @@ The consumer supports two SSE streaming patterns:
* *`SendStreamingMessage`* — The stream starts with the submitted `Task`,
followed by progress events from `A2AProgress` or `${a2a:emit()}`. When the
route completes, the body is emitted as the final message.
* *`SubscribeToTask`* — Clients subscribe via `POST /tasks/\{id}:subscribe` to
receive real-time SSE updates for an existing non-terminal task. The component
uses an Event-to-Stream Bridge (`QueueStreamEmitter` + `SseQueueInputStream`)
with heartbeat comments to keep the connection alive. The stream sends the
current `Task` first and ends when the task later reaches a terminal state.
Subscribing to an already-terminal task returns `UnsupportedOperationError`.
-[IMPORTANT]
+[NOTE]
====
*HTTP Server Component for Streaming*
-For real-time SSE streaming, use `httpServerComponent=undertow` or
`httpServerComponent=jetty`. The default Vert.x `platform-http` buffers
`InputStream` responses due to an `AsyncInputStream` greedy-fill loop, causing
all events to arrive at once.
-
-[source,yaml]
-----
-parameters:
- httpServerComponent: undertow
-----
-
-This requires `camel-undertow` (or `camel-jetty`) on the classpath.
+Real-time SSE streaming works with all tested HTTP server components:
Vert.x/platform-http, Undertow, and Jetty.
====
SSE streaming parameters:
@@ -1654,11 +1646,9 @@ The preview component does not auto-create arbitrary
HTTP server components and
Tested behavior in this Preview release:
-* `camel-platform-http` / Vert.x - suitable for JSON-RPC agents and serving
the public agent card. REST custom-method paths containing colons can collide,
and SSE `InputStream` responses are buffered.
+* `camel-platform-http` / Vert.x - suitable for JSON-RPC agents (including SSE
streaming) and serving the public agent card. REST custom-method paths
containing colons can collide.
* `camel-undertow` and `camel-jetty` - suitable for REST custom-method paths
and real-time SSE streaming.
-For SSE streaming, specify `httpServerComponent=undertow` or
`httpServerComponent=jetty` and add the matching component dependency.
-
== Data Format
The `dataFormat` parameter controls what the exchange body contains, following
the same convention as the xref:cxf-component.adoc[CXF component]. It applies
to both consumer (inbound request) and producer (response) sides.
diff --git
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
index b2ea643b08d2..17825ef417d3 100644
---
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
+++
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
@@ -48,6 +48,7 @@ public class AsyncInputStream implements ReadStream<Buffer> {
private final Vertx vertx;
private final Context context;
private final InboundBuffer<Buffer> queue;
+ private final boolean eagerFlush;
private long readPos;
private boolean closed;
private boolean readInProgress;
@@ -56,9 +57,14 @@ public class AsyncInputStream implements ReadStream<Buffer> {
private Handler<Throwable> exceptionHandler;
public AsyncInputStream(Vertx vertx, Context context, InputStream
inputStream) {
+ this(vertx, context, inputStream, false);
+ }
+
+ public AsyncInputStream(Vertx vertx, Context context, InputStream
inputStream, boolean eagerFlush) {
this.vertx = vertx;
this.context = context;
this.channel = Channels.newChannel(inputStream);
+ this.eagerFlush = eagerFlush;
this.queue = new InboundBuffer<>(context, 0);
queue.handler(buffer -> {
if (buffer.length() > 0) {
@@ -222,22 +228,24 @@ public class AsyncInputStream implements
ReadStream<Buffer> {
// EOF
context.runOnContext((v) -> {
buffer.flip();
+ int written = offset + buffer.limit();
writeBuff.setBytes(offset, buffer);
buffer.compact();
-
handler.handle(Future.succeededFuture(writeBuff));
+
handler.handle(Future.succeededFuture(writeBuff.slice(0, written)));
});
- } else if (buffer.hasRemaining()) {
- // Read from the next offset
+ } else if (!eagerFlush && buffer.hasRemaining()) {
+ // Read from the next offset to fill the buffer
context.runOnContext((v) -> {
doRead(writeBuff, offset, buffer, position +
bytesRead, handler);
});
} else {
- // All data is written
+ // Emit what we have (always for eagerFlush, or
when buffer is full)
context.runOnContext((v) -> {
buffer.flip();
+ int written = offset + buffer.limit();
writeBuff.setBytes(offset, buffer);
buffer.compact();
-
handler.handle(Future.succeededFuture(writeBuff));
+
handler.handle(Future.succeededFuture(writeBuff.slice(0, written)));
});
}
} else {
diff --git
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index 3c9eba48653b..b731f93fadd4 100644
---
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -237,8 +237,12 @@ public final class VertxPlatformHttpSupport {
Vertx vertx = ctx.vertx();
Context context = vertx.getOrCreateContext();
+ // For SSE streams, flush each chunk immediately instead of greedily
filling the buffer
+ String contentType = response.headers().get("Content-Type");
+ boolean eagerFlush = contentType != null &&
contentType.startsWith("text/event-stream");
+
// Process the InputStream async to avoid blocking the Vert.x event
loop on large responses
- AsyncInputStream asyncInputStream = new AsyncInputStream(vertx,
context, is);
+ AsyncInputStream asyncInputStream = new AsyncInputStream(vertx,
context, is, eagerFlush);
asyncInputStream.exceptionHandler(promise::fail);
asyncInputStream.endHandler(event -> endHandler(promise, response,
asyncInputStream));
diff --git
a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSseTest.java
b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSseTest.java
new file mode 100644
index 000000000000..52ca8e7f147b
--- /dev/null
+++
b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSseTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpMethod;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class VertxPlatformHttpSseTest {
+
+ @Test
+ void testSseEventsDeliveredEagerly() throws Exception {
+ final CamelContext context =
VertxPlatformHttpEngineTest.createCamelContext();
+
+ // Writer waits on this latch after sending event-1.
+ // If event-1 is flushed eagerly, the client reads it and counts down,
+ // unblocking the writer to send event-2.
+ // If events are batched, the client never sees event-1 until the
stream
+ // closes, so the latch times out and the test fails.
+ CountDownLatch event1Received = new CountDownLatch(1);
+ AtomicReference<Throwable> writerError = new AtomicReference<>();
+
+ HttpClient client = null;
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/sse")
+ .process(exchange -> {
+ PipedOutputStream out = new
PipedOutputStream();
+ PipedInputStream in = new
PipedInputStream(out);
+
+
exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/event-stream");
+ exchange.getMessage().setBody(in);
+
+ Thread writer = new Thread(() -> {
+ try {
+ out.write("data:
event-1\n\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ assertTrue(event1Received.await(5,
TimeUnit.SECONDS),
+ "Client did not receive
event-1 before timeout — events may be buffered");
+
+ out.write("data:
event-2\n\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ out.close();
+ } catch (Throwable e) {
+ writerError.set(e);
+ try {
+ out.close();
+ } catch (Exception ignored) {
+ }
+ }
+ });
+ writer.setDaemon(true);
+ writer.start();
+ });
+ }
+ });
+
+ VertxPlatformHttpEngineTest.startCamelContext(context);
+
+ VertxPlatformHttpServer server =
context.hasService(VertxPlatformHttpServer.class);
+ client = server.getVertx().createHttpClient();
+
+ StringBuilder received = new StringBuilder();
+ CompletableFuture<Void> streamDone = new CompletableFuture<>();
+
+ client.request(HttpMethod.GET, server.getPort(), "localhost",
"/sse")
+ .onSuccess(request -> {
+ request.response()
+ .onSuccess(response -> {
+ assertEquals(200, response.statusCode());
+ assertEquals("text/event-stream",
response.getHeader("Content-Type"));
+
+ response.handler(buffer -> {
+ String chunk =
buffer.toString(StandardCharsets.UTF_8);
+ received.append(chunk);
+
+ if (chunk.contains("event-1")) {
+ event1Received.countDown();
+ }
+ });
+ response.endHandler(v ->
streamDone.complete(null));
+
response.exceptionHandler(streamDone::completeExceptionally);
+ })
+ .onFailure(streamDone::completeExceptionally);
+ request.end();
+ })
+ .onFailure(streamDone::completeExceptionally);
+
+ streamDone.get(10, TimeUnit.SECONDS);
+
+ String[] events = received.toString().split("\n\n");
+ assertEquals(2, events.length);
+ assertEquals("data: event-1", events[0]);
+ assertEquals("data: event-2", events[1]);
+
+ Throwable error = writerError.get();
+ if (error != null) {
+ throw new AssertionError("Writer thread failed", error);
+ }
+ } finally {
+ if (client != null) {
+
client.close().toCompletionStage().toCompletableFuture().get(5,
TimeUnit.SECONDS);
+ }
+ context.stop();
+ }
+ }
+}