This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 512e2757bfe4a8dfb2b2e9f9b2229daa79c86d47
Author: James Netherton <[email protected]>
AuthorDate: Tue Feb 15 11:03:47 2022 +0000

    CAMEL-17650: camel-vertx-websocket: sendToAll operation should consider the 
path WebSocket clients are connected to
---
 .../vertx/websocket/VertxWebsocketEndpoint.java     | 21 ++++++++++-----------
 .../vertx/websocket/VertxWebsocketProducer.java     | 18 ++++++++----------
 .../vertx/websocket/VertxWebsocketTest.java         | 21 +++++++++++++++++++++
 3 files changed, 39 insertions(+), 21 deletions(-)

diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
index c414395..e84237d 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpClient;
@@ -153,23 +154,21 @@ public class VertxWebsocketEndpoint extends 
DefaultEndpoint {
     }
 
     /**
-     * Finds all WebSockets associated with a host matching this endpoint 
configured port
+     * Finds all WebSockets associated with a host matching this endpoint 
configured port and resource path
      */
     protected Map<String, ServerWebSocket> findPeersForHostPort() {
-        Map<VertxWebsocketHostKey, VertxWebsocketHost> registry = 
getVertxHostRegistry();
-        for (VertxWebsocketHost host : registry.values()) {
-            if (host.getPort() == getConfiguration().getPort()) {
-                return host.getConnectedPeers();
-            }
-        }
-        return null;
+        return getVertxHostRegistry()
+                .values()
+                .stream()
+                .filter(host -> host.getPort() == getConfiguration().getPort())
+                .flatMap(host -> host.getConnectedPeers().entrySet().stream())
+                .filter(entry -> 
entry.getValue().path().equals(getConfiguration().getPath()))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
 
     protected boolean isManagedPort() {
         return getVertxHostRegistry().values()
                 .stream()
-                .filter(host -> host.getPort() == getConfiguration().getPort())
-                .findFirst()
-                .isPresent();
+                .anyMatch(host -> host.getPort() == 
getConfiguration().getPort());
     }
 }
diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
index 493d445..79ae2d2 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
@@ -30,6 +30,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,8 +107,9 @@ public class VertxWebsocketProducer extends 
DefaultAsyncProducer {
     private Map<String, WebSocketBase> getConnectedPeers(Exchange exchange) 
throws Exception {
         Map<String, WebSocketBase> connectedPeers = new HashMap<>();
         VertxWebsocketEndpoint endpoint = getEndpoint();
+        Message message = exchange.getMessage();
 
-        String connectionKey = 
exchange.getMessage().getHeader(VertxWebsocketConstants.CONNECTION_KEY, 
String.class);
+        String connectionKey = 
message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class);
         if (connectionKey != null) {
             if (endpoint.isManagedPort()) {
                 Stream.of(connectionKey.split(","))
@@ -120,20 +122,16 @@ public class VertxWebsocketProducer extends 
DefaultAsyncProducer {
             connectedPeers.put(UUID.randomUUID().toString(), 
endpoint.getWebSocket(exchange));
         }
 
-        if (isSendToAll(exchange.getMessage())) {
+        boolean isSendToAll = 
message.getHeader(VertxWebsocketConstants.SEND_TO_ALL,
+                getEndpoint().getConfiguration().isSendToAll(), boolean.class);
+        if (isSendToAll) {
             // Try to find all peers connected to an existing vertx-websocket 
consumer
             Map<String, ServerWebSocket> peers = 
endpoint.findPeersForHostPort();
-            if (peers != null) {
-                peers.forEach(connectedPeers::put);
+            if (ObjectHelper.isNotEmpty(peers)) {
+                connectedPeers.putAll(peers);
             }
         }
 
         return connectedPeers;
     }
-
-    private boolean isSendToAll(Message message) {
-        Boolean value = message.getHeader(VertxWebsocketConstants.SEND_TO_ALL, 
getEndpoint().getConfiguration().isSendToAll(),
-                Boolean.class);
-        return value == null ? false : value;
-    }
 }
diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
index 3fcd5e9..78f6458 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
@@ -176,6 +176,15 @@ public class VertxWebsocketTest extends 
VertxWebSocketTestSupport {
             });
         }
 
+        // Open a connection on path /test-other to ensure the 'send to all' 
operation
+        // only targeted peers connected on path /test
+        openWebSocketConnection("localhost", port, "/test-other", message -> {
+            synchronized (latch) {
+                results.add(message + " " + latch.getCount());
+                latch.countDown();
+            }
+        });
+
         template.sendBody("vertx-websocket:localhost:" + port + 
"/test?sendToAll=true", "Hello World");
 
         assertTrue(latch.await(10, TimeUnit.SECONDS));
@@ -201,6 +210,15 @@ public class VertxWebsocketTest extends 
VertxWebSocketTestSupport {
             });
         }
 
+        // Open a connection on path /test-other to ensure the 'send to all' 
operation
+        // only targeted peers connected on path /test
+        openWebSocketConnection("localhost", port, "/test-other", message -> {
+            synchronized (latch) {
+                results.add(message + " " + latch.getCount());
+                latch.countDown();
+            }
+        });
+
         template.sendBodyAndHeader("vertx-websocket:localhost:" + port + 
"/test", "Hello World",
                 VertxWebsocketConstants.SEND_TO_ALL, true);
 
@@ -251,6 +269,9 @@ public class VertxWebsocketTest extends 
VertxWebSocketTestSupport {
                         .setBody(simple("Hello ${body}"))
                         .to("mock:result");
 
+                fromF("vertx-websocket:localhost:%d/test-other", port)
+                        .setBody(simple("Hello ${body}"));
+
                 from("vertx-websocket://greeting")
                         .setBody(simple("Hello ${body}"))
                         .process(new Processor() {

Reply via email to