This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a5ee2e  CAMEL-17694: improve handling of sendToAll header
5a5ee2e is described below

commit 5a5ee2e928ea7b2fb234df75aed1409eb6d47aa8
Author: Sergei Portnov <[email protected]>
AuthorDate: Tue Feb 22 21:39:24 2022 +0300

    CAMEL-17694: improve handling of sendToAll header
---
 .../vertx/websocket/VertxWebsocketProducer.java    | 24 +++++++++-------------
 .../vertx/websocket/VertxWebsocketTest.java        | 10 +++++++++
 2 files changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
index 79ae2d2..28bfd8b 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
@@ -109,27 +109,23 @@ public class VertxWebsocketProducer extends 
DefaultAsyncProducer {
         VertxWebsocketEndpoint endpoint = getEndpoint();
         Message message = exchange.getMessage();
 
-        String connectionKey = 
message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class);
-        if (connectionKey != null) {
-            if (endpoint.isManagedPort()) {
-                Stream.of(connectionKey.split(","))
-                        .forEach(key -> connectedPeers.put(key, 
endpoint.findPeerForConnectionKey(key)));
-            } else {
-                // The producer is invoking an external server not managed by 
camel
-                connectedPeers.put(UUID.randomUUID().toString(), 
endpoint.getWebSocket(exchange));
-            }
-        } else {
-            connectedPeers.put(UUID.randomUUID().toString(), 
endpoint.getWebSocket(exchange));
-        }
-
         boolean isSendToAll = 
message.getHeader(VertxWebsocketConstants.SEND_TO_ALL,
-                getEndpoint().getConfiguration().isSendToAll(), boolean.class);
+                endpoint.getConfiguration().isSendToAll(), boolean.class);
         if (isSendToAll) {
             // Try to find all peers connected to an existing vertx-websocket 
consumer
             Map<String, ServerWebSocket> peers = 
endpoint.findPeersForHostPort();
             if (ObjectHelper.isNotEmpty(peers)) {
                 connectedPeers.putAll(peers);
             }
+        } else {
+            String connectionKey = 
message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class);
+            if (connectionKey != null && endpoint.isManagedPort()) {
+                Stream.of(connectionKey.split(","))
+                        .forEach(key -> connectedPeers.put(key, 
endpoint.findPeerForConnectionKey(key)));
+            } else {
+                // The producer is invoking an external server not managed by 
camel
+                connectedPeers.put(UUID.randomUUID().toString(), 
endpoint.getWebSocket(exchange));
+            }
         }
 
         return connectedPeers;
diff --git 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
index 78f6458..5897958 100644
--- 
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
+++ 
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
@@ -163,6 +163,9 @@ public class VertxWebsocketTest extends 
VertxWebSocketTestSupport {
 
     @Test
     public void testSendToAll() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+        mockEndpoint.expectedMessageCount(0);
+
         int expectedResultCount = 5;
         CountDownLatch latch = new CountDownLatch(expectedResultCount);
         List<String> results = new ArrayList<>();
@@ -193,10 +196,15 @@ public class VertxWebsocketTest extends 
VertxWebSocketTestSupport {
         for (int i = 1; i <= expectedResultCount; i++) {
             assertTrue(results.contains("Hello World " + i));
         }
+
+        mockEndpoint.assertIsSatisfied(TimeUnit.SECONDS.toMillis(1));
     }
 
     @Test
     public void testSendToAllWithHeader() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+        mockEndpoint.expectedMessageCount(0);
+
         int expectedResultCount = 5;
         CountDownLatch latch = new CountDownLatch(expectedResultCount);
         List<String> results = new ArrayList<>();
@@ -228,6 +236,8 @@ public class VertxWebsocketTest extends 
VertxWebSocketTestSupport {
         for (int i = 1; i <= expectedResultCount; i++) {
             assertTrue(results.contains("Hello World " + i));
         }
+
+        mockEndpoint.assertIsSatisfied(TimeUnit.SECONDS.toMillis(1));
     }
 
     @Test

Reply via email to