This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1161439f86f271fdf6c995c11316d3918ab34639 Author: Croway <[email protected]> AuthorDate: Wed Jan 25 18:45:58 2023 +0100 Fix useStreaming keyword and test --- .../websocket/WebsocketEndpointConfigurer.java | 9 +- .../websocket/WebsocketEndpointUriFactory.java | 2 +- .../atmosphere/websocket/atmosphere-websocket.json | 2 +- .../atmosphere/websocket/WebsocketConsumer.java | 2 +- .../atmosphere/websocket/WebsocketEndpoint.java | 10 +- .../atmosphere/websocket/WebsocketRouteTest.java | 10 +- .../websocket/WebsocketRouteWithInitParamTest.java | 139 +++++++++------------ 7 files changed, 76 insertions(+), 98 deletions(-) diff --git a/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointConfigurer.java b/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointConfigurer.java index d817561f479..870be2f87f8 100644 --- a/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointConfigurer.java +++ b/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointConfigurer.java @@ -27,8 +27,7 @@ public class WebsocketEndpointConfigurer extends ServletEndpointConfigurer imple case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "sendtoall": case "sendToAll": target.setSendToAll(property(camelContext, boolean.class, value)); return true; - case "usestreaming": - case "useStreaming": target.setUseStreaming(property(camelContext, boolean.class, value)); return true; + case "streaming": target.setStreaming(property(camelContext, boolean.class, value)); return true; default: return super.configure(camelContext, obj, name, value, ignoreCase); } } @@ -42,8 +41,7 @@ public class WebsocketEndpointConfigurer extends ServletEndpointConfigurer imple case "lazyStartProducer": return boolean.class; case "sendtoall": case "sendToAll": return boolean.class; - case "usestreaming": - case "useStreaming": return boolean.class; + case "streaming": return boolean.class; default: return super.getOptionType(name, ignoreCase); } } @@ -58,8 +56,7 @@ public class WebsocketEndpointConfigurer extends ServletEndpointConfigurer imple case "lazyStartProducer": return target.isLazyStartProducer(); case "sendtoall": case "sendToAll": return target.isSendToAll(); - case "usestreaming": - case "useStreaming": return target.isUseStreaming(); + case "streaming": return target.isStreaming(); default: return super.getOptionValue(obj, name, ignoreCase); } } diff --git a/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointUriFactory.java b/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointUriFactory.java index 70656c71cf5..4a810dbd2f1 100644 --- a/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointUriFactory.java +++ b/components/camel-atmosphere-websocket/src/generated/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpointUriFactory.java @@ -46,9 +46,9 @@ public class WebsocketEndpointUriFactory extends org.apache.camel.support.compon props.add("sendToAll"); props.add("servicePath"); props.add("servletName"); + props.add("streaming"); props.add("traceEnabled"); props.add("transferException"); - props.add("useStreaming"); PROPERTY_NAMES = Collections.unmodifiableSet(props); SECRET_PROPERTY_NAMES = Collections.emptySet(); MULTI_VALUE_PREFIXES = Collections.emptySet(); diff --git a/components/camel-atmosphere-websocket/src/generated/resources/org/apache/camel/component/atmosphere/websocket/atmosphere-websocket.json b/components/camel-atmosphere-websocket/src/generated/resources/org/apache/camel/component/atmosphere/websocket/atmosphere-websocket.json index 66c0abc87a1..3ecdc2691be 100644 --- a/components/camel-atmosphere-websocket/src/generated/resources/org/apache/camel/component/atmosphere/websocket/atmosphere-websocket.json +++ b/components/camel-atmosphere-websocket/src/generated/resources/org/apache/camel/component/atmosphere/websocket/atmosphere-websocket.json @@ -47,8 +47,8 @@ "chunked": { "kind": "parameter", "displayName": "Chunked", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "If this option is false the Servlet will disable the HTTP streaming and set the content-length header on the response" }, "disableStreamCache": { "kind": "parameter", "displayName": "Disable Stream Cache", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Determines whether or not the raw input stream from Servlet is cached or not (Camel will read the stream into a in memory\/overflow to file, Stream caching) cache. By default Camel will cache the Servlet inpu [...] "sendToAll": { "kind": "parameter", "displayName": "Send To All", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to send to all (broadcast) or send to a single receiver." }, + "streaming": { "kind": "parameter", "displayName": "Streaming", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "To enable streaming to send data as multiple text fragments." }, "transferException": { "kind": "parameter", "displayName": "Transfer Exception", "group": "common", "label": "common", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled and an Exchange failed processing on the consumer side, and if the caused Exception was send back serialized in the response as a application\/x-java-serialized-object content type. On the producer si [...] - "useStreaming": { "kind": "parameter", "displayName": "Use Streaming", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "To enable streaming to send data as multiple text fragments." }, "headerFilterStrategy": { "kind": "parameter", "displayName": "Header Filter Strategy", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." }, "httpBinding": { "kind": "parameter", "displayName": "Http Binding", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.http.common.HttpBinding", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HttpBinding to control the mapping between Camel message and HttpClient." }, "async": { "kind": "parameter", "displayName": "Async", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Configure the consumer to work in async mode" }, diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java index ee0d6586fc5..4115c9efa56 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java @@ -62,7 +62,7 @@ public class WebsocketConsumer extends ServletConsumer { this.framework.addInitParameter(ApplicationConfig.ANALYTICS, "false"); this.framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); this.framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL, - getEndpoint().isUseStreaming() ? WebsocketStreamHandler.class.getName() : WebsocketHandler.class.getName()); + getEndpoint().isStreaming() ? WebsocketStreamHandler.class.getName() : WebsocketHandler.class.getName()); this.framework.init(config); WebSocketProtocol wsp = this.framework.getWebSocketProtocol(); diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java index 2ec631602ef..afc64c3e57e 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java @@ -51,7 +51,7 @@ public class WebsocketEndpoint extends ServletEndpoint { @UriParam private boolean sendToAll; @UriParam - private boolean useStreaming; + private boolean streaming; public WebsocketEndpoint(String endPointURI, WebsocketComponent component, URI httpUri) throws URISyntaxException { super(endPointURI, component, httpUri); @@ -86,15 +86,15 @@ public class WebsocketEndpoint extends ServletEndpoint { this.sendToAll = sendToAll; } - public boolean isUseStreaming() { - return useStreaming; + public boolean isStreaming() { + return streaming; } /** * To enable streaming to send data as multiple text fragments. */ - public void setUseStreaming(boolean useStreaming) { - this.useStreaming = useStreaming; + public void setStreaming(boolean streaming) { + this.streaming = streaming; } WebSocketStore getWebSocketStore() { diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java index 312252c1fae..2ad0f94eb43 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java @@ -95,8 +95,8 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport { @Test void testWebsocketBroadcastClient() throws Exception { - WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola2", 2); - WebsocketTestClient wsclient2 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola2", 2); + WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2); + WebsocketTestClient wsclient2 = new WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2); wsclient1.connect(); wsclient2.connect(); @@ -142,14 +142,14 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport { }).to("atmosphere-websocket:///hola"); // route for a broadcast line - from("atmosphere-websocket:///hola2").to("log:info").process(new Processor() { + from("atmosphere-websocket:///broadcast").to("log:info").process(new Processor() { public void process(final Exchange exchange) { createResponse(exchange, false); } - }).to("atmosphere-websocket:///hola2?sendToAll=true"); + }).to("atmosphere-websocket:///broadcast?sendToAll=true"); // route for a single stream line - from("atmosphere-websocket:///hola3?useStreaming=true").to("log:info").process(new Processor() { + from("atmosphere-websocket:///hola3?streaming=true").to("log:info").process(new Processor() { public void process(final Exchange exchange) { createResponse(exchange, true); } diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java index 72834b36fbb..4d4467ce1bd 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java @@ -22,9 +22,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -38,16 +35,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import org.testcontainers.shaded.org.awaitility.Awaitility; - public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithInitParamTestSupport { private static final String[] EXISTED_USERS = { "Kim", "Pavlo", "Peter" }; private static String[] broadcastMessageTo = {}; private static Map<String, String> connectionKeyUserMap = new HashMap<>(); - private ExecutorService executor = Executors.newSingleThreadExecutor(); - private void runtTest(String s) throws InterruptedException, ExecutionException, IOException { WebsocketTestClient wsclient = new WebsocketTestClient("ws://localhost:" + PORT + s); wsclient.connect(); @@ -70,52 +63,46 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni final int awaitTime = 5; connectionKeyUserMap.clear(); - executor.submit(() -> { - try { - WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola2", 2); - WebsocketTestClient wsclient2 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola2", 2); - WebsocketTestClient wsclient3 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola2", 2); + WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2); + WebsocketTestClient wsclient2 = new WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2); + WebsocketTestClient wsclient3 = new WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2); - wsclient1.connect(); - wsclient1.await(awaitTime); + wsclient1.connect(); + wsclient1.await(awaitTime); - wsclient2.connect(); - wsclient2.await(awaitTime); + wsclient2.connect(); + wsclient2.await(awaitTime); - wsclient3.connect(); - wsclient3.await(awaitTime); + wsclient3.connect(); + wsclient3.await(awaitTime); - //all connections were registered in external store - assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size()); + //all connections were registered in external store + assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size()); - broadcastMessageTo = new String[] {EXISTED_USERS[0], EXISTED_USERS[1]}; + broadcastMessageTo = new String[] { EXISTED_USERS[0], EXISTED_USERS[1] }; - wsclient1.sendTextMessage("Gambas"); - wsclient1.await(awaitTime); + wsclient1.sendTextMessage("Gambas"); + wsclient1.await(awaitTime); - List<String> received1 = wsclient1.getReceived(String.class); - assertEquals(1, received1.size()); + List<String> received1 = wsclient1.getReceived(String.class); + assertEquals(1, received1.size()); - for (String element : broadcastMessageTo) { - assertTrue(received1.get(0).contains(element)); - } + for (String element : broadcastMessageTo) { + assertTrue(received1.get(0).contains(element)); + } - List<String> received2 = wsclient2.getReceived(String.class); - assertEquals(1, received2.size()); - for (String element : broadcastMessageTo) { - assertTrue(received2.get(0).contains(element)); - } + List<String> received2 = wsclient2.getReceived(String.class); + assertEquals(1, received2.size()); + for (String element : broadcastMessageTo) { + assertTrue(received2.get(0).contains(element)); + } - List<String> received3 = wsclient3.getReceived(String.class); - assertEquals(0, received3.size()); + List<String> received3 = wsclient3.getReceived(String.class); + assertEquals(0, received3.size()); - wsclient1.close(); - wsclient2.close(); - wsclient3.close(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted thread", e); - } - }).get(); + wsclient1.close(); + wsclient2.close(); + wsclient3.close(); } @Test @@ -123,51 +110,45 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni final int awaitTime = 5; connectionKeyUserMap.clear(); - executor.submit(() -> { - try { - WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola3", 2); - WebsocketTestClient wsclient2 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola3", 2); - WebsocketTestClient wsclient3 = new WebsocketTestClient("ws://localhost:" + PORT + "/hola3", 2); + WebsocketTestClient wsclient1 = new WebsocketTestClient("ws://localhost:" + PORT + "/guarantee", 2); + WebsocketTestClient wsclient2 = new WebsocketTestClient("ws://localhost:" + PORT + "/guarantee", 2); + WebsocketTestClient wsclient3 = new WebsocketTestClient("ws://localhost:" + PORT + "/guarantee", 2); - wsclient1.connect(); - wsclient1.await(awaitTime); + wsclient1.connect(); + wsclient1.await(1); - wsclient2.connect(); - wsclient2.await(awaitTime); + wsclient2.connect(); + wsclient2.await(1); - wsclient3.connect(); - wsclient3.await(awaitTime); + wsclient3.connect(); + wsclient3.await(1); - //all connections were registered in external store - assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size()); + //all connections were registered in external store + assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size()); - wsclient2.close(); - wsclient2.await(awaitTime); + wsclient2.close(); + wsclient2.await(awaitTime); - broadcastMessageTo = new String[] { EXISTED_USERS[0], EXISTED_USERS[1] }; + broadcastMessageTo = new String[] { EXISTED_USERS[0], EXISTED_USERS[1] }; - wsclient1.sendTextMessage("Gambas"); - wsclient1.await(awaitTime); + wsclient1.sendTextMessage("Gambas"); + wsclient1.await(awaitTime); - List<String> received1 = wsclient1.getReceived(String.class); - assertEquals(1, received1.size()); + List<String> received1 = wsclient1.getReceived(String.class); + assertEquals(1, received1.size()); - for (String element : broadcastMessageTo) { - assertTrue(received1.get(0).contains(element)); - } + for (String element : broadcastMessageTo) { + assertTrue(received1.get(0).contains(element)); + } - List<String> received2 = wsclient2.getReceived(String.class); - assertEquals(0, received2.size()); + List<String> received2 = wsclient2.getReceived(String.class); + assertEquals(0, received2.size()); - List<String> received3 = wsclient3.getReceived(String.class); - assertEquals(0, received3.size()); + List<String> received3 = wsclient3.getReceived(String.class); + assertEquals(0, received3.size()); - wsclient1.close(); - wsclient3.close(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted thread", e); - } - }).get(); + wsclient1.close(); + wsclient3.close(); } // START SNIPPET: payload @@ -190,7 +171,7 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni }); // route for single client broadcast to multiple clients - from("atmosphere-websocket:///hola2").to("log:info") + from("atmosphere-websocket:///broadcast").to("log:info") .choice() .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE)) .process(new Processor() { @@ -215,10 +196,10 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni public void process(final Exchange exchange) { createBroadcastMultipleClientsResponse(exchange); } - }).to("atmosphere-websocket:///hola2"); + }).to("atmosphere-websocket:///broadcast"); // route for single client broadcast to multiple clients guarantee delivery - from("atmosphere-websocket:///hola3").to("log:info") + from("atmosphere-websocket:///guarantee").to("log:info") .choice() .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE)) .process(new Processor() { @@ -249,7 +230,7 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni public void process(final Exchange exchange) { createBroadcastMultipleClientsResponse(exchange); } - }).to("atmosphere-websocket:///hola3"); + }).to("atmosphere-websocket:///guarantee"); } }; }
