This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 512e2757bfe4a8dfb2b2e9f9b2229daa79c86d47 Author: James Netherton <[email protected]> AuthorDate: Tue Feb 15 11:03:47 2022 +0000 CAMEL-17650: camel-vertx-websocket: sendToAll operation should consider the path WebSocket clients are connected to --- .../vertx/websocket/VertxWebsocketEndpoint.java | 21 ++++++++++----------- .../vertx/websocket/VertxWebsocketProducer.java | 18 ++++++++---------- .../vertx/websocket/VertxWebsocketTest.java | 21 +++++++++++++++++++++ 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java index c414395..e84237d 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; @@ -153,23 +154,21 @@ public class VertxWebsocketEndpoint extends DefaultEndpoint { } /** - * Finds all WebSockets associated with a host matching this endpoint configured port + * Finds all WebSockets associated with a host matching this endpoint configured port and resource path */ protected Map<String, ServerWebSocket> findPeersForHostPort() { - Map<VertxWebsocketHostKey, VertxWebsocketHost> registry = getVertxHostRegistry(); - for (VertxWebsocketHost host : registry.values()) { - if (host.getPort() == getConfiguration().getPort()) { - return host.getConnectedPeers(); - } - } - return null; + return getVertxHostRegistry() + .values() + .stream() + .filter(host -> host.getPort() == getConfiguration().getPort()) + .flatMap(host -> host.getConnectedPeers().entrySet().stream()) + .filter(entry -> entry.getValue().path().equals(getConfiguration().getPath())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } protected boolean isManagedPort() { return getVertxHostRegistry().values() .stream() - .filter(host -> host.getPort() == getConfiguration().getPort()) - .findFirst() - .isPresent(); + .anyMatch(host -> host.getPort() == getConfiguration().getPort()); } } diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java index 493d445..79ae2d2 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java @@ -30,6 +30,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.support.DefaultAsyncProducer; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,8 +107,9 @@ public class VertxWebsocketProducer extends DefaultAsyncProducer { private Map<String, WebSocketBase> getConnectedPeers(Exchange exchange) throws Exception { Map<String, WebSocketBase> connectedPeers = new HashMap<>(); VertxWebsocketEndpoint endpoint = getEndpoint(); + Message message = exchange.getMessage(); - String connectionKey = exchange.getMessage().getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class); + String connectionKey = message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class); if (connectionKey != null) { if (endpoint.isManagedPort()) { Stream.of(connectionKey.split(",")) @@ -120,20 +122,16 @@ public class VertxWebsocketProducer extends DefaultAsyncProducer { connectedPeers.put(UUID.randomUUID().toString(), endpoint.getWebSocket(exchange)); } - if (isSendToAll(exchange.getMessage())) { + boolean isSendToAll = message.getHeader(VertxWebsocketConstants.SEND_TO_ALL, + getEndpoint().getConfiguration().isSendToAll(), boolean.class); + if (isSendToAll) { // Try to find all peers connected to an existing vertx-websocket consumer Map<String, ServerWebSocket> peers = endpoint.findPeersForHostPort(); - if (peers != null) { - peers.forEach(connectedPeers::put); + if (ObjectHelper.isNotEmpty(peers)) { + connectedPeers.putAll(peers); } } return connectedPeers; } - - private boolean isSendToAll(Message message) { - Boolean value = message.getHeader(VertxWebsocketConstants.SEND_TO_ALL, getEndpoint().getConfiguration().isSendToAll(), - Boolean.class); - return value == null ? false : value; - } } diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java index 3fcd5e9..78f6458 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java +++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java @@ -176,6 +176,15 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport { }); } + // Open a connection on path /test-other to ensure the 'send to all' operation + // only targeted peers connected on path /test + openWebSocketConnection("localhost", port, "/test-other", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + template.sendBody("vertx-websocket:localhost:" + port + "/test?sendToAll=true", "Hello World"); assertTrue(latch.await(10, TimeUnit.SECONDS)); @@ -201,6 +210,15 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport { }); } + // Open a connection on path /test-other to ensure the 'send to all' operation + // only targeted peers connected on path /test + openWebSocketConnection("localhost", port, "/test-other", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test", "Hello World", VertxWebsocketConstants.SEND_TO_ALL, true); @@ -251,6 +269,9 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport { .setBody(simple("Hello ${body}")) .to("mock:result"); + fromF("vertx-websocket:localhost:%d/test-other", port) + .setBody(simple("Hello ${body}")); + from("vertx-websocket://greeting") .setBody(simple("Hello ${body}")) .process(new Processor() {
