This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 5a5ee2e CAMEL-17694: improve handling of sendToAll header
5a5ee2e is described below
commit 5a5ee2e928ea7b2fb234df75aed1409eb6d47aa8
Author: Sergei Portnov <[email protected]>
AuthorDate: Tue Feb 22 21:39:24 2022 +0300
CAMEL-17694: improve handling of sendToAll header
---
.../vertx/websocket/VertxWebsocketProducer.java | 24 +++++++++-------------
.../vertx/websocket/VertxWebsocketTest.java | 10 +++++++++
2 files changed, 20 insertions(+), 14 deletions(-)
diff --git
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
index 79ae2d2..28bfd8b 100644
---
a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
+++
b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketProducer.java
@@ -109,27 +109,23 @@ public class VertxWebsocketProducer extends
DefaultAsyncProducer {
VertxWebsocketEndpoint endpoint = getEndpoint();
Message message = exchange.getMessage();
- String connectionKey =
message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class);
- if (connectionKey != null) {
- if (endpoint.isManagedPort()) {
- Stream.of(connectionKey.split(","))
- .forEach(key -> connectedPeers.put(key,
endpoint.findPeerForConnectionKey(key)));
- } else {
- // The producer is invoking an external server not managed by
camel
- connectedPeers.put(UUID.randomUUID().toString(),
endpoint.getWebSocket(exchange));
- }
- } else {
- connectedPeers.put(UUID.randomUUID().toString(),
endpoint.getWebSocket(exchange));
- }
-
boolean isSendToAll =
message.getHeader(VertxWebsocketConstants.SEND_TO_ALL,
- getEndpoint().getConfiguration().isSendToAll(), boolean.class);
+ endpoint.getConfiguration().isSendToAll(), boolean.class);
if (isSendToAll) {
// Try to find all peers connected to an existing vertx-websocket
consumer
Map<String, ServerWebSocket> peers =
endpoint.findPeersForHostPort();
if (ObjectHelper.isNotEmpty(peers)) {
connectedPeers.putAll(peers);
}
+ } else {
+ String connectionKey =
message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class);
+ if (connectionKey != null && endpoint.isManagedPort()) {
+ Stream.of(connectionKey.split(","))
+ .forEach(key -> connectedPeers.put(key,
endpoint.findPeerForConnectionKey(key)));
+ } else {
+ // The producer is invoking an external server not managed by
camel
+ connectedPeers.put(UUID.randomUUID().toString(),
endpoint.getWebSocket(exchange));
+ }
}
return connectedPeers;
diff --git
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
index 78f6458..5897958 100644
---
a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
+++
b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
@@ -163,6 +163,9 @@ public class VertxWebsocketTest extends
VertxWebSocketTestSupport {
@Test
public void testSendToAll() throws Exception {
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+ mockEndpoint.expectedMessageCount(0);
+
int expectedResultCount = 5;
CountDownLatch latch = new CountDownLatch(expectedResultCount);
List<String> results = new ArrayList<>();
@@ -193,10 +196,15 @@ public class VertxWebsocketTest extends
VertxWebSocketTestSupport {
for (int i = 1; i <= expectedResultCount; i++) {
assertTrue(results.contains("Hello World " + i));
}
+
+ mockEndpoint.assertIsSatisfied(TimeUnit.SECONDS.toMillis(1));
}
@Test
public void testSendToAllWithHeader() throws Exception {
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+ mockEndpoint.expectedMessageCount(0);
+
int expectedResultCount = 5;
CountDownLatch latch = new CountDownLatch(expectedResultCount);
List<String> results = new ArrayList<>();
@@ -228,6 +236,8 @@ public class VertxWebsocketTest extends
VertxWebSocketTestSupport {
for (int i = 1; i <= expectedResultCount; i++) {
assertTrue(results.contains("Hello World " + i));
}
+
+ mockEndpoint.assertIsSatisfied(TimeUnit.SECONDS.toMillis(1));
}
@Test