This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit c609203d735db6a54cabc8e92d35005a9f334068 Author: Croway <[email protected]> AuthorDate: Wed Feb 11 11:41:07 2026 +0100 Use awaitility in camel-atmoshpere and add a note about jdk25 streaming --- components/camel-atmosphere-websocket/pom.xml | 6 ++++++ .../main/docs/atmosphere-websocket-component.adoc | 4 ++++ .../atmosphere/websocket/WebsocketRoute1Test.java | 18 +++++++++++----- .../atmosphere/websocket/WebsocketRoute2Test.java | 18 +++++++++++----- .../WebsocketRoute2WithInitParamTest.java | 24 ++++++++++++---------- .../atmosphere/websocket/WebsocketRoute3Test.java | 14 ++++++++----- .../WebsocketRoute3WithInitParamTest.java | 22 ++++++++++++-------- .../atmosphere/websocket/WebsocketRoute4Test.java | 14 ++++++++----- 8 files changed, 80 insertions(+), 40 deletions(-) diff --git a/components/camel-atmosphere-websocket/pom.xml b/components/camel-atmosphere-websocket/pom.xml index ee40caa8c6ca..2380679c85d4 100644 --- a/components/camel-atmosphere-websocket/pom.xml +++ b/components/camel-atmosphere-websocket/pom.xml @@ -71,6 +71,12 @@ <version>${mockito-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility-version}</version> + <scope>test</scope> + </dependency> <!-- test infra --> <dependency> diff --git a/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc b/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc index 3de9702dc6da..cb008cfc3d00 100644 --- a/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc +++ b/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc @@ -21,6 +21,10 @@ connections from external clients). This component uses the https://github.com/Atmosphere/atmosphere[Atmosphere] library to support the Websocket transport in various Servlet containers. +NOTE: On JDK 25 or later, websocket messages are delivered in streaming mode (`Reader`/`InputStream`) even when the `useStreaming` option is not enabled. +This is due to a change in the JDK's internal JSR 356 WebSocket implementation. +The component handles both modes transparently, but if your route processors check the body type, they should account for this difference. + Maven users will need to add the following dependency to their `pom.xml` for this component: diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java index cc43d668152c..da0f7fd09a5e 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java @@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.io.StringReader; +import java.lang.Runtime.Version; import java.util.List; import org.apache.camel.Exchange; @@ -72,24 +74,30 @@ public class WebsocketRoute1Test extends WebsocketCamelRouterTestSupport { // route for a single line from("atmosphere-websocket:///hola").to("log:info").process(new Processor() { public void process(final Exchange exchange) { - createResponse(exchange); + // JDK 25+ delivers websocket messages in streaming mode (Reader/InputStream) + // even without useStreaming=true, due to a change in the JSR 356 implementation + boolean streaming = Runtime.version().compareTo(Version.parse("25")) >= 0; + createResponse(exchange, streaming); } }).to("atmosphere-websocket:///hola"); } }; } - private static void createResponse(Exchange exchange) { + private static void createResponse(Exchange exchange, boolean streaming) { Object msg = exchange.getIn().getBody(); - assertTrue(msg instanceof String || msg instanceof byte[] || msg instanceof Reader || msg instanceof InputStream, - "Expects String, byte[], Reader or InputStream"); + if (streaming) { + assertTrue(msg instanceof Reader || msg instanceof InputStream, "Expects Reader or InputStream"); + } else { + assertTrue(msg instanceof String || msg instanceof byte[], "Expects String or byte[]"); + } if (msg instanceof String) { exchange.getIn().setBody(RESPONSE_GREETING + msg); } else if (msg instanceof byte[]) { exchange.getIn().setBody(createByteResponse((byte[]) msg)); } else if (msg instanceof Reader) { - exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) msg)); + exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + readAll((Reader) msg))); } else if (msg instanceof InputStream) { exchange.getIn().setBody(createByteResponse(readAll((InputStream) msg))); } diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java index cf97b5d5cac4..1f27e08fecf1 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java @@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.io.StringReader; +import java.lang.Runtime.Version; import java.util.List; import org.apache.camel.Exchange; @@ -72,24 +74,30 @@ public class WebsocketRoute2Test extends WebsocketCamelRouterTestSupport { // route for a broadcast line from("atmosphere-websocket:///broadcast").to("log:info").process(new Processor() { public void process(final Exchange exchange) { - createResponse(exchange); + // JDK 25+ delivers websocket messages in streaming mode (Reader/InputStream) + // even without useStreaming=true, due to a change in the JSR 356 implementation + boolean streaming = Runtime.version().compareTo(Version.parse("25")) >= 0; + createResponse(exchange, streaming); } }).to("atmosphere-websocket:///broadcast?sendToAll=true"); } }; } - private static void createResponse(Exchange exchange) { + private static void createResponse(Exchange exchange, boolean streaming) { Object msg = exchange.getIn().getBody(); - assertTrue(msg instanceof String || msg instanceof byte[] || msg instanceof Reader || msg instanceof InputStream, - "Expects String, byte[], Reader or InputStream"); + if (streaming) { + assertTrue(msg instanceof Reader || msg instanceof InputStream, "Expects Reader or InputStream"); + } else { + assertTrue(msg instanceof String || msg instanceof byte[], "Expects String or byte[]"); + } if (msg instanceof String) { exchange.getIn().setBody(RESPONSE_GREETING + msg); } else if (msg instanceof byte[]) { exchange.getIn().setBody(createByteResponse((byte[]) msg)); } else if (msg instanceof Reader) { - exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) msg)); + exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + readAll((Reader) msg))); } else if (msg instanceof InputStream) { exchange.getIn().setBody(createByteResponse(readAll((InputStream) msg))); } diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java index 030d1031fc8d..d0b134e2cf54 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java @@ -20,11 +20,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.infra.common.http.WebsocketTestClient; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -38,7 +40,6 @@ public class WebsocketRoute2WithInitParamTest extends WebsocketCamelRouterWithIn @Test void testWebsocketSingleClientBroadcastMultipleClients() throws Exception { - final int awaitTime = 2; connectionKeyUserMap.clear(); WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2); @@ -46,31 +47,32 @@ public class WebsocketRoute2WithInitParamTest extends WebsocketCamelRouterWithIn WebsocketTestClient wsclient3 = new WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2); wsclient1.connect(); - wsclient1.await(awaitTime); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(1, connectionKeyUserMap.size())); wsclient2.connect(); - wsclient2.await(awaitTime); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(2, connectionKeyUserMap.size())); wsclient3.connect(); - wsclient3.await(awaitTime); - - //all connections were registered in external store - assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size()); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size())); broadcastMessageTo = new String[] { EXISTED_USERS[0], EXISTED_USERS[1] }; wsclient1.sendTextMessage("Gambas"); - wsclient1.await(awaitTime); - List<String> received1 = wsclient1.getReceived(String.class); - assertEquals(1, received1.size()); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(1, wsclient1.getReceived(String.class).size())); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(1, wsclient2.getReceived(String.class).size())); + List<String> received1 = wsclient1.getReceived(String.class); for (String element : broadcastMessageTo) { assertTrue(received1.get(0).contains(element)); } List<String> received2 = wsclient2.getReceived(String.class); - assertEquals(1, received2.size()); for (String element : broadcastMessageTo) { assertTrue(received2.get(0).contains(element)); } diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java index b79e135ec090..2dc9664ce94d 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.io.StringReader; import java.util.List; import org.apache.camel.Exchange; @@ -72,24 +73,27 @@ public class WebsocketRoute3Test extends WebsocketCamelRouterTestSupport { // route for a single stream line from("atmosphere-websocket:///hola3?useStreaming=true").to("log:info").process(new Processor() { public void process(final Exchange exchange) { - createResponse(exchange); + createResponse(exchange, true); } }).to("atmosphere-websocket:///hola3"); } }; } - private static void createResponse(Exchange exchange) { + private static void createResponse(Exchange exchange, boolean streaming) { Object msg = exchange.getIn().getBody(); - assertTrue(msg instanceof String || msg instanceof byte[] || msg instanceof Reader || msg instanceof InputStream, - "Expects String, byte[], Reader or InputStream"); + if (streaming) { + assertTrue(msg instanceof Reader || msg instanceof InputStream, "Expects Reader or InputStream"); + } else { + assertTrue(msg instanceof String || msg instanceof byte[], "Expects String or byte[]"); + } if (msg instanceof String) { exchange.getIn().setBody(RESPONSE_GREETING + msg); } else if (msg instanceof byte[]) { exchange.getIn().setBody(createByteResponse((byte[]) msg)); } else if (msg instanceof Reader) { - exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) msg)); + exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + readAll((Reader) msg))); } else if (msg instanceof InputStream) { exchange.getIn().setBody(createByteResponse(readAll((InputStream) msg))); } diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java index 311bc64f40e4..9aa2dcef7cdd 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java @@ -20,11 +20,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.infra.common.http.WebsocketTestClient; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -38,7 +40,6 @@ public class WebsocketRoute3WithInitParamTest extends WebsocketCamelRouterWithIn @Test void testWebsocketSingleClientBroadcastMultipleClientsGuaranteeDelivery() throws Exception { - final int awaitTime = 2; connectionKeyUserMap.clear(); WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/guarantee", 2); @@ -46,27 +47,30 @@ public class WebsocketRoute3WithInitParamTest extends WebsocketCamelRouterWithIn WebsocketTestClient wsclient3 = new WebsocketTestClient("ws://localhost:" + PORT + "/guarantee", 2); wsclient1.connect(); - wsclient1.await(awaitTime); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(1, connectionKeyUserMap.size())); wsclient2.connect(); - wsclient2.await(awaitTime); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(2, connectionKeyUserMap.size())); wsclient3.connect(); - wsclient3.await(awaitTime); - //all connections were registered in external store - assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size()); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size())); wsclient2.close(); - wsclient2.await(awaitTime); + // brief wait for the close event to be processed server-side + Thread.sleep(500); broadcastMessageTo = new String[] { EXISTED_USERS[0], EXISTED_USERS[1] }; wsclient1.sendTextMessage("Gambas"); - wsclient1.await(awaitTime); + + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(1, wsclient1.getReceived(String.class).size())); List<String> received1 = wsclient1.getReceived(String.class); - assertEquals(1, received1.size()); for (String element : broadcastMessageTo) { assertTrue(received1.get(0).contains(element)); diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java index a5a549d88007..1331d18708ff 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.io.StringReader; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -39,7 +40,7 @@ public class WebsocketRoute4Test extends WebsocketCamelRouterTestSupport { void testWebsocketEventsResendingDisabled() throws Exception { WebsocketTestClient wsclient = new WebsocketTestClient("ws://localhost:" + PORT + "/hola4"); wsclient.connect(); - assertFalse(wsclient.await(10)); + assertFalse(wsclient.await(2)); wsclient.close(); } @@ -58,17 +59,20 @@ public class WebsocketRoute4Test extends WebsocketCamelRouterTestSupport { }; } - private static void createResponse(Exchange exchange) { + private static void createResponse(Exchange exchange, boolean streaming) { Object msg = exchange.getIn().getBody(); - assertTrue(msg instanceof String || msg instanceof byte[] || msg instanceof Reader || msg instanceof InputStream, - "Expects String, byte[], Reader or InputStream"); + if (streaming) { + assertTrue(msg instanceof Reader || msg instanceof InputStream, "Expects Reader or InputStream"); + } else { + assertTrue(msg instanceof String || msg instanceof byte[], "Expects String or byte[]"); + } if (msg instanceof String) { exchange.getIn().setBody(RESPONSE_GREETING + msg); } else if (msg instanceof byte[]) { exchange.getIn().setBody(createByteResponse((byte[]) msg)); } else if (msg instanceof Reader) { - exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) msg)); + exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + readAll((Reader) msg))); } else if (msg instanceof InputStream) { exchange.getIn().setBody(createByteResponse(readAll((InputStream) msg))); }
