Repository: nifi
Updated Branches:
  refs/heads/master 816034bd0 -> 769e87467


Support for websocket multiplexing to all existing websocket
connections (Think chat to all clients instead of individual person).
The core change was a change in WebSocketMessageRouter.java where if a
sessionId is not present the message is sent to all connected clients.
So the key is leaving the sessionId to empty or null to send to all
clients. If the sessionId is specified the message will be sent just to
that session specified.

This closes #1649.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/769e8746
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/769e8746
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/769e8746

Branch: refs/heads/master
Commit: 769e874677ee5cfe1ee483c6a9b49f8db89ed93f
Parents: 816034b
Author: Jeremy Dyer <[email protected]>
Authored: Tue Apr 4 14:46:17 2017 -0400
Committer: Koji Kawamura <[email protected]>
Committed: Fri Apr 21 08:21:56 2017 +0900

----------------------------------------------------------------------
 .../nifi/processors/websocket/PutWebSocket.java | 68 +++++++++++---------
 .../processors/websocket/TestPutWebSocket.java  | 44 ++++++-------
 .../nifi/websocket/WebSocketMessageRouter.java  | 25 +++++--
 3 files changed, 80 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
index beef839..5a438b5 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java
@@ -17,6 +17,26 @@
 
 package org.apache.nifi.processors.websocket;
 
+import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
+import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
+import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
+import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
+import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
+import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
+import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
+import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -38,25 +58,6 @@ import 
org.apache.nifi.websocket.WebSocketConfigurationException;
 import org.apache.nifi.websocket.WebSocketMessage;
 import org.apache.nifi.websocket.WebSocketService;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
-import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
-import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
-import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
-import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
-import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
-import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
-import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME;
-
 @Tags({"WebSocket", "publish", "send"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @TriggerSerially
@@ -76,7 +77,8 @@ public class PutWebSocket extends AbstractProcessor {
     public static final PropertyDescriptor PROP_WS_SESSION_ID = new 
PropertyDescriptor.Builder()
             .name("websocket-session-id")
             .displayName("WebSocket Session Id")
-            .description("A NiFi Expression to retrieve the session id.")
+            .description("A NiFi Expression to retrieve the session id. If not 
specified, a message will be " +
+                    "sent to all connected WebSocket peers for the WebSocket 
controller service endpoint.")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(true)
@@ -166,8 +168,11 @@ public class PutWebSocket extends AbstractProcessor {
                 .evaluateAttributeExpressions(flowfile).getValue();
         final WebSocketMessage.Type messageType = 
WebSocketMessage.Type.valueOf(messageTypeStr);
 
-        if (StringUtils.isEmpty(sessionId)
-                || StringUtils.isEmpty(webSocketServiceId)
+        if (StringUtils.isEmpty(sessionId)) {
+            getLogger().debug("Specific SessionID not specified. Message will 
be broadcast to all connected clients.");
+        }
+
+        if (StringUtils.isEmpty(webSocketServiceId)
                 || StringUtils.isEmpty(webSocketServiceEndpoint)) {
             transferToFailure(processSession, flowfile, "Required WebSocket 
attribute was not found.");
             return;
@@ -187,9 +192,14 @@ public class PutWebSocket extends AbstractProcessor {
         final byte[] messageContent = new byte[(int) flowfile.getSize()];
         final long startSending = System.currentTimeMillis();
 
+        final AtomicReference<String> transitUri = new AtomicReference<>();
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier());
-        attrs.put(ATTR_WS_SESSION_ID, sessionId);
+
+        if (!StringUtils.isEmpty(sessionId)) {
+            attrs.put(ATTR_WS_SESSION_ID, sessionId);
+        }
+
         attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint);
         attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr);
 
@@ -211,13 +221,14 @@ public class PutWebSocket extends AbstractProcessor {
 
                 attrs.put(ATTR_WS_LOCAL_ADDRESS, 
sender.getLocalAddress().toString());
                 attrs.put(ATTR_WS_REMOTE_ADDRESS, 
sender.getRemoteAddress().toString());
+                transitUri.set(sender.getTransitUri());
+            });
 
-                final FlowFile updatedFlowFile = 
processSession.putAllAttributes(flowfile, attrs);
-                final long transmissionMillis = System.currentTimeMillis() - 
startSending;
-                processSession.getProvenanceReporter().send(updatedFlowFile, 
sender.getTransitUri(), transmissionMillis);
+            final FlowFile updatedFlowFile = 
processSession.putAllAttributes(flowfile, attrs);
+            final long transmissionMillis = System.currentTimeMillis() - 
startSending;
+            processSession.getProvenanceReporter().send(updatedFlowFile, 
transitUri.get(), transmissionMillis);
 
-                processSession.transfer(updatedFlowFile, REL_SUCCESS);
-            });
+            processSession.transfer(updatedFlowFile, REL_SUCCESS);
 
         } catch 
(WebSocketConfigurationException|IllegalStateException|IOException e) {
             // WebSocketConfigurationException: If the corresponding 
WebSocketGatewayProcessor has been stopped.
@@ -235,5 +246,4 @@ public class PutWebSocket extends AbstractProcessor {
         return flowfile;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
index a987fe4..52f6f2a 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java
@@ -16,24 +16,6 @@
  */
 package org.apache.nifi.processors.websocket;
 
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.apache.nifi.websocket.AbstractWebSocketSession;
-import org.apache.nifi.websocket.SendMessage;
-import org.apache.nifi.websocket.WebSocketMessage;
-import org.apache.nifi.websocket.WebSocketService;
-import org.apache.nifi.websocket.WebSocketSession;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
 import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
 import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
@@ -50,6 +32,24 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.websocket.AbstractWebSocketSession;
+import org.apache.nifi.websocket.SendMessage;
+import org.apache.nifi.websocket.WebSocketMessage;
+import org.apache.nifi.websocket.WebSocketService;
+import org.apache.nifi.websocket.WebSocketSession;
+import org.junit.Test;
+
 
 public class TestPutWebSocket {
 
@@ -92,12 +92,12 @@ public class TestPutWebSocket {
         runner.run();
 
         final List<MockFlowFile> succeededFlowFiles = 
runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS);
-        assertEquals(0, succeededFlowFiles.size());
+        //assertEquals(0, succeededFlowFiles.size());   //No longer valid test 
after NIFI-3318 since not specifying sessionid will send to all clients
+        assertEquals(1, succeededFlowFiles.size());
 
         final List<MockFlowFile> failedFlowFiles = 
runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE);
-        assertEquals(1, failedFlowFiles.size());
-        final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next();
-        assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL));
+        //assertEquals(1, failedFlowFiles.size());      //No longer valid test 
after NIFI-3318
+        assertEquals(0, failedFlowFiles.size());
 
         final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
         assertEquals(0, provenanceEvents.size());

http://git-wip-us.apache.org/repos/asf/nifi/blob/769e8746/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
index 057b33d..e5034e1 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
@@ -16,14 +16,15 @@
  */
 package org.apache.nifi.websocket;
 
-import org.apache.nifi.processor.Processor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.Processor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class WebSocketMessageRouter {
     private static final Logger logger = 
LoggerFactory.getLogger(WebSocketMessageRouter.class);
     private final String endpointId;
@@ -101,8 +102,20 @@ public class WebSocketMessageRouter {
     }
 
     public void sendMessage(final String sessionId, final SendMessage 
sendMessage) throws IOException {
-        final WebSocketSession session = getSessionOrFail(sessionId);
-        sendMessage.send(session);
+        if (!StringUtils.isEmpty(sessionId)) {
+            final WebSocketSession session = getSessionOrFail(sessionId);
+            sendMessage.send(session);
+        } else {
+            //The sessionID is not specified so broadcast the message to all 
connected client sessions.
+            sessions.keySet().forEach(itrSessionId -> {
+                try {
+                    final WebSocketSession session = 
getSessionOrFail(itrSessionId);
+                    sendMessage.send(session);
+                } catch (IOException e) {
+                    logger.warn("Failed to send message to session {} due to 
{}", itrSessionId, e, e);
+                }
+            });
+        }
     }
 
     public void disconnect(final String sessionId, final String reason) throws 
IOException {

Reply via email to