This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 46bb7d153d NIFI-9558: ConnectWebSocket leaks connections and 
duplicates FlowFiles in incoming connection mode (new PR)
46bb7d153d is described below

commit 46bb7d153d5b17db77fc70042396973cbfa561a9
Author: Lehel <[email protected]>
AuthorDate: Thu Jan 13 00:15:00 2022 +0100

    NIFI-9558: ConnectWebSocket leaks connections and duplicates FlowFiles in 
incoming connection mode (new PR)
    
    This closes #6176.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../AbstractWebSocketGatewayProcessor.java         | 33 +++++++++++-----------
 .../processors/websocket/ConnectWebSocket.java     |  4 ++-
 .../additionalDetails.html                         |  3 +-
 .../processors/websocket/TestConnectWebSocket.java |  8 ++----
 4 files changed, 23 insertions(+), 25 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 33129bfbc0..9824c4a6d2 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -185,28 +185,27 @@ public abstract class AbstractWebSocketGatewayProcessor 
extends AbstractSessionF
         }
 
         if (!isProcessorRegisteredToService()) {
-            try {
-                registerProcessorToService(context, webSocketService -> 
onWebSocketServiceReady(webSocketService, context));
-            } catch (IOException | WebSocketConfigurationException e) {
-                // Deregister processor if it failed so that it can retry next 
onTrigger.
-                deregister();
-                context.yield();
-                throw new ProcessException("Failed to register processor to 
WebSocket service due to: " + e, e);
-            }
-
-        } else if (context.hasIncomingConnection()) {
-            try {
-                onWebSocketServiceReady(webSocketService, context);
-            } catch (IOException e) {
-                deregister();
-                context.yield();
-                throw new ProcessException("Failed to renew session and 
connect to WebSocket service due to: " + e, e);
-            }
+            register(context);
+        } else if (webSocketService instanceof WebSocketClientService && 
context.hasIncomingConnection()) {
+            // Deregister processor to close previous sessions when flowfile 
is provided.
+            deregister();
+            register(context);
         }
 
         context.yield();//nothing really to do here since handling WebSocket 
messages is done at ControllerService.
     }
 
+    private void register(ProcessContext context) {
+        try {
+            registerProcessorToService(context, webSocketService -> 
onWebSocketServiceReady(webSocketService, context));
+        } catch (IOException | WebSocketConfigurationException e) {
+            // Deregister processor if it failed so that it can retry next 
onTrigger.
+            deregister();
+            context.yield();
+            throw new ProcessException("Failed to register processor to 
WebSocket service due to: " + e, e);
+        }
+    }
+
 
     private void enqueueMessage(final WebSocketMessage incomingMessage) {
         final ProcessSession session = processSessionFactory.createSession();
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
index dfbd9c482c..be53854d7b 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
@@ -48,7 +48,9 @@ import static 
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.
 @TriggerSerially
 @CapabilityDescription("Acts as a WebSocket client endpoint to interact with a 
remote WebSocket server." +
         " FlowFiles are transferred to downstream relationships according to 
received message types" +
-        " as WebSocket client configured with this processor receives messages 
from remote WebSocket server.")
+        " as WebSocket client configured with this processor receives messages 
from remote WebSocket server." +
+        " If a new flowfile is passed to the processor, the previous sessions 
will be closed and any data being" +
+        " sent will be aborted.")
 @WritesAttributes({
         @WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket 
Controller Service id."),
         @WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = 
"Established WebSocket session id."),
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
index 8c4c2e8135..ecb7c0a176 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
@@ -35,7 +35,8 @@
 <p>
     You can define custom websocket headers in the incoming flowfile as 
additional attributes. The attribute key
     shall start with "header." and continue with they header key. For example: 
"header.Authorization". The attribute
-    value will be the corresponding header value.
+    value will be the corresponding header value. If a new flowfile is passed 
to the processor, the previous sessions will be closed,
+    and any data being sent will be aborted.
 <ol>
     <li>header.Autorization | Basic base64UserNamePassWord</li>
     <li>header.Content-Type | application, audio, example</li>
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
index dbefe9befc..01c7b8079b 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
@@ -44,7 +44,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -52,7 +51,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 
-public class TestConnectWebSocket extends TestListenWebSocket {
+class TestConnectWebSocket extends TestListenWebSocket {
 
     @Test
     public void testSuccess() throws Exception {
@@ -126,7 +125,7 @@ public class TestConnectWebSocket extends 
TestListenWebSocket {
     }
 
     @Test
-    public void testDynamicUrlsParsedFromFlowFileAndAbleToConnect() throws 
InitializationException {
+    void testDynamicUrlsParsedFromFlowFileAndAbleToConnect() throws 
InitializationException {
         // Start websocket server
         final int port = NetworkUtils.availablePort();
         TestRunner webSocketListener = getListenWebSocket(port);
@@ -163,9 +162,6 @@ public class TestConnectWebSocket extends 
TestListenWebSocket {
         final List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
         assertEquals(1, flowFilesForRelationship.size());
 
-        final AssertionError assertionError = 
assertThrows(AssertionError.class, () -> runner.run(1));
-        
assertTrue(assertionError.getCause().getLocalizedMessage().contains("Failed to 
renew session and connect to WebSocket service"));
-
         runner.stop();
         webSocketListener.stop();
     }

Reply via email to