Repository: nifi Updated Branches: refs/heads/master 816034bd0 -> 769e87467
Support for websocket multiplexing to all existing websocket connections (Think chat to all clients instead of individual person). The core change was a change in WebSocketMessageRouter.java where if a sessionId is not present the message is sent to all connected clients. So the key is leaving the sessionId to empty or null to send to all clients. If the sessionId is specified the message will be sent just to that session specified. This closes #1649. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/769e8746 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/769e8746 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/769e8746 Branch: refs/heads/master Commit: 769e874677ee5cfe1ee483c6a9b49f8db89ed93f Parents: 816034b Author: Jeremy Dyer <[email protected]> Authored: Tue Apr 4 14:46:17 2017 -0400 Committer: Koji Kawamura <[email protected]> Committed: Fri Apr 21 08:21:56 2017 +0900 ---------------------------------------------------------------------- .../nifi/processors/websocket/PutWebSocket.java | 68 +++++++++++--------- .../processors/websocket/TestPutWebSocket.java | 44 ++++++------- .../nifi/websocket/WebSocketMessageRouter.java | 25 +++++-- 3 files changed, 80 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java index beef839..5a438b5 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java @@ -17,6 +17,26 @@ package org.apache.nifi.processors.websocket; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; +import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -38,25 +58,6 @@ import org.apache.nifi.websocket.WebSocketConfigurationException; import org.apache.nifi.websocket.WebSocketMessage; import org.apache.nifi.websocket.WebSocketService; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; -import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME; - @Tags({"WebSocket", "publish", "send"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @TriggerSerially @@ -76,7 +77,8 @@ public class PutWebSocket extends AbstractProcessor { public static final PropertyDescriptor PROP_WS_SESSION_ID = new PropertyDescriptor.Builder() .name("websocket-session-id") .displayName("WebSocket Session Id") - .description("A NiFi Expression to retrieve the session id.") + .description("A NiFi Expression to retrieve the session id. If not specified, a message will be " + + "sent to all connected WebSocket peers for the WebSocket controller service endpoint.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(true) @@ -166,8 +168,11 @@ public class PutWebSocket extends AbstractProcessor { .evaluateAttributeExpressions(flowfile).getValue(); final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr); - if (StringUtils.isEmpty(sessionId) - || StringUtils.isEmpty(webSocketServiceId) + if (StringUtils.isEmpty(sessionId)) { + getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients."); + } + + if (StringUtils.isEmpty(webSocketServiceId) || StringUtils.isEmpty(webSocketServiceEndpoint)) { transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found."); return; @@ -187,9 +192,14 @@ public class PutWebSocket extends AbstractProcessor { final byte[] messageContent = new byte[(int) flowfile.getSize()]; final long startSending = System.currentTimeMillis(); + final AtomicReference<String> transitUri = new AtomicReference<>(); final Map<String, String> attrs = new HashMap<>(); attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier()); - attrs.put(ATTR_WS_SESSION_ID, sessionId); + + if (!StringUtils.isEmpty(sessionId)) { + attrs.put(ATTR_WS_SESSION_ID, sessionId); + } + attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint); attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr); @@ -211,13 +221,14 @@ public class PutWebSocket extends AbstractProcessor { attrs.put(ATTR_WS_LOCAL_ADDRESS, sender.getLocalAddress().toString()); attrs.put(ATTR_WS_REMOTE_ADDRESS, sender.getRemoteAddress().toString()); + transitUri.set(sender.getTransitUri()); + }); - final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs); - final long transmissionMillis = System.currentTimeMillis() - startSending; - processSession.getProvenanceReporter().send(updatedFlowFile, sender.getTransitUri(), transmissionMillis); + final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs); + final long transmissionMillis = System.currentTimeMillis() - startSending; + processSession.getProvenanceReporter().send(updatedFlowFile, transitUri.get(), transmissionMillis); - processSession.transfer(updatedFlowFile, REL_SUCCESS); - }); + processSession.transfer(updatedFlowFile, REL_SUCCESS); } catch (WebSocketConfigurationException|IllegalStateException|IOException e) { // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped. @@ -235,5 +246,4 @@ public class PutWebSocket extends AbstractProcessor { return flowfile; } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java index a987fe4..52f6f2a 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java @@ -16,24 +16,6 @@ */ package org.apache.nifi.processors.websocket; -import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.apache.nifi.websocket.AbstractWebSocketSession; -import org.apache.nifi.websocket.SendMessage; -import org.apache.nifi.websocket.WebSocketMessage; -import org.apache.nifi.websocket.WebSocketService; -import org.apache.nifi.websocket.WebSocketSession; -import org.junit.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL; @@ -50,6 +32,24 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.websocket.AbstractWebSocketSession; +import org.apache.nifi.websocket.SendMessage; +import org.apache.nifi.websocket.WebSocketMessage; +import org.apache.nifi.websocket.WebSocketService; +import org.apache.nifi.websocket.WebSocketSession; +import org.junit.Test; + public class TestPutWebSocket { @@ -92,12 +92,12 @@ public class TestPutWebSocket { runner.run(); final List<MockFlowFile> succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(0, succeededFlowFiles.size()); + //assertEquals(0, succeededFlowFiles.size()); //No longer valid test after NIFI-3318 since not specifying sessionid will send to all clients + assertEquals(1, succeededFlowFiles.size()); final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(1, failedFlowFiles.size()); - final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); - assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); + //assertEquals(1, failedFlowFiles.size()); //No longer valid test after NIFI-3318 + assertEquals(0, failedFlowFiles.size()); final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java index 057b33d..e5034e1 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java @@ -16,14 +16,15 @@ */ package org.apache.nifi.websocket; -import org.apache.nifi.processor.Processor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class WebSocketMessageRouter { private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageRouter.class); private final String endpointId; @@ -101,8 +102,20 @@ public class WebSocketMessageRouter { } public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException { - final WebSocketSession session = getSessionOrFail(sessionId); - sendMessage.send(session); + if (!StringUtils.isEmpty(sessionId)) { + final WebSocketSession session = getSessionOrFail(sessionId); + sendMessage.send(session); + } else { + //The sessionID is not specified so broadcast the message to all connected client sessions. + sessions.keySet().forEach(itrSessionId -> { + try { + final WebSocketSession session = getSessionOrFail(itrSessionId); + sendMessage.send(session); + } catch (IOException e) { + logger.warn("Failed to send message to session {} due to {}", itrSessionId, e, e); + } + }); + } } public void disconnect(final String sessionId, final String reason) throws IOException {
