This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e4cb5a3615 NIFI-14013 Add disconnected Relationship to 
ConnectWebSocket (#9533)
e4cb5a3615 is described below

commit e4cb5a36159165c6c0aea2150b2c251519ecb11f
Author: POliveNokia <[email protected]>
AuthorDate: Sat Dec 14 14:18:53 2024 +0000

    NIFI-14013 Add disconnected Relationship to ConnectWebSocket (#9533)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../AbstractWebSocketGatewayProcessor.java         | 17 +++++++
 .../processors/websocket/TestConnectWebSocket.java | 53 +++++++++++++++++++++-
 .../apache/nifi/websocket/ConnectedListener.java   |  1 +
 ...ener.java => WebSocketDisconnectedMessage.java} | 47 ++++++++++---------
 .../nifi/websocket/WebSocketMessageRouter.java     |  3 ++
 5 files changed, 96 insertions(+), 25 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 784745465c..55921de2b3 100644
--- 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++ 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -35,6 +35,7 @@ import org.apache.nifi.websocket.TextMessageConsumer;
 import org.apache.nifi.websocket.WebSocketClientService;
 import org.apache.nifi.websocket.WebSocketConfigurationException;
 import org.apache.nifi.websocket.WebSocketConnectedMessage;
+import org.apache.nifi.websocket.WebSocketDisconnectedMessage;
 import org.apache.nifi.websocket.WebSocketMessage;
 import org.apache.nifi.websocket.WebSocketService;
 import org.apache.nifi.websocket.WebSocketSessionInfo;
@@ -67,6 +68,12 @@ public abstract class AbstractWebSocketGatewayProcessor 
extends AbstractSessionF
             .description("The WebSocket session is established")
             .build();
 
+    public static final Relationship REL_DISCONNECTED = new 
Relationship.Builder()
+            .name("disconnected")
+            .description("The WebSocket session is disconnected")
+            .autoTerminateDefault(true)
+            .build();
+
     public static final Relationship REL_MESSAGE_TEXT = new 
Relationship.Builder()
             .name("text message")
             .description("The WebSocket text message output")
@@ -92,6 +99,7 @@ public abstract class AbstractWebSocketGatewayProcessor 
extends AbstractSessionF
     static Set<Relationship> getAbstractRelationships() {
         final Set<Relationship> relationships = new HashSet<>();
         relationships.add(REL_CONNECTED);
+        relationships.add(REL_DISCONNECTED);
         relationships.add(REL_MESSAGE_TEXT);
         relationships.add(REL_MESSAGE_BINARY);
         return relationships;
@@ -114,6 +122,13 @@ public abstract class AbstractWebSocketGatewayProcessor 
extends AbstractSessionF
         enqueueMessage(message);
     }
 
+    @Override
+    public void disconnected(WebSocketSessionInfo sessionInfo) {
+        final WebSocketMessage message = new 
WebSocketDisconnectedMessage(sessionInfo);
+        sessionInfo.setTransitUri(getTransitUri(sessionInfo));
+        enqueueMessage(message);
+    }
+
     @Override
     public void consume(WebSocketSessionInfo sessionInfo, String messageStr) {
         final WebSocketMessage message = new WebSocketMessage(sessionInfo);
@@ -252,6 +267,8 @@ public abstract class AbstractWebSocketGatewayProcessor 
extends AbstractSessionF
 
             if (incomingMessage instanceof WebSocketConnectedMessage) {
                 session.transfer(messageFlowFile, REL_CONNECTED);
+            } else if (incomingMessage instanceof 
WebSocketDisconnectedMessage) {
+                session.transfer(messageFlowFile, REL_DISCONNECTED);
             } else {
                 switch (Objects.requireNonNull(messageType)) {
                     case TEXT:
diff --git 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
index 3cc237c614..170baa7082 100644
--- 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
+++ 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
@@ -91,6 +91,7 @@ class TestConnectWebSocket extends TestListenWebSocket {
             processor.consume(webSocketSession, binaryMessage, 0, 
binaryMessage.length);
             processor.consume(webSocketSession, binaryMessage, 0, 
binaryMessage.length);
             processor.consume(webSocketSession, binaryMessage, 0, 
binaryMessage.length);
+            processor.disconnected(webSocketSession);
             return null;
         }).when(service).connect(endpointId);
         runner.addControllerService(serviceId, service);
@@ -119,7 +120,7 @@ class TestConnectWebSocket extends TestListenWebSocket {
         binaryFlowFiles.forEach(ff -> assertFlowFile(webSocketSession, 
serviceId, endpointId, ff, WebSocketMessage.Type.BINARY));
 
         final List<ProvenanceEventRecord> provenanceEvents = 
sharedSessionState.getProvenanceEvents();
-        assertEquals(6, provenanceEvents.size());
+        assertEquals(7, provenanceEvents.size());
         assertTrue(provenanceEvents.stream().allMatch(event -> 
ProvenanceEventType.RECEIVE.equals(event.getEventType())));
     }
 
@@ -201,6 +202,56 @@ class TestConnectWebSocket extends TestListenWebSocket {
         runner.stop();
     }
 
+    @Test
+    void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect() 
throws InitializationException {
+        // Start websocket server
+        final TestRunner webSocketListener = 
TestRunners.newTestRunner(ListenWebSocket.class);
+
+        final String serverId = "ws-server-service";
+        JettyWebSocketServer server = new JettyWebSocketServer();
+        webSocketListener.addControllerService(serverId, server);
+        webSocketListener.setProperty(server, 
JettyWebSocketServer.LISTEN_PORT, "0");
+        webSocketListener.enableControllerService(server);
+
+        
webSocketListener.setProperty(ListenWebSocket.PROP_WEBSOCKET_SERVER_SERVICE, 
serverId);
+        webSocketListener.setProperty(ListenWebSocket.PROP_SERVER_URL_PATH, 
"/test");
+
+        webSocketListener.run(1, false);
+        final int listeningPort = server.getListeningPort();
+
+        final TestRunner runner = 
TestRunners.newTestRunner(ConnectWebSocket.class);
+
+        final String clientId = "ws-service";
+        final String endpointId = "client-1";
+
+        MockFlowFile flowFile = getFlowFile();
+        runner.enqueue(flowFile);
+
+        JettyWebSocketClient client = new JettyWebSocketClient();
+
+
+        runner.addControllerService(clientId, client);
+        runner.setProperty(client, JettyWebSocketClient.WS_URI, 
String.format("ws://localhost:%s/${dynamicUrlPart}", listeningPort));
+        runner.enableControllerService(client);
+
+        runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_SERVICE, 
clientId);
+        runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_ID, 
endpointId);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFilesForConnectedRelationship = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
+        assertEquals(1, flowFilesForConnectedRelationship.size());
+
+        final List<MockFlowFile> flowFilesForSuccess = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_SUCCESS);
+        assertEquals(1, flowFilesForSuccess.size());
+
+        webSocketListener.disableControllerService(server);
+
+        final List<MockFlowFile> flowFilesForDisconnectedRelationship = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_DISCONNECTED);
+        assertEquals(1, flowFilesForDisconnectedRelationship.size());
+
+        runner.stop();
+    }
 
     private MockFlowFile getFlowFile() {
         Map<String, String> attributes = new HashMap<>();
diff --git 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
index b682094da6..124606c373 100644
--- 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
+++ 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
@@ -21,4 +21,5 @@ package org.apache.nifi.websocket;
  */
 public interface ConnectedListener {
     void connected(final WebSocketSessionInfo sessionInfo);
+    void disconnected(final WebSocketSessionInfo sessionInfo);
 }
diff --git 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java
similarity index 80%
copy from 
nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
copy to 
nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java
index b682094da6..00abf1cd9a 100644
--- 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
+++ 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java
@@ -1,24 +1,23 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-/**
- * To be performed when a WebSocket connection is established.
- */
-public interface ConnectedListener {
-    void connected(final WebSocketSessionInfo sessionInfo);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+public class WebSocketDisconnectedMessage extends WebSocketMessage {
+    public WebSocketDisconnectedMessage(final WebSocketSessionInfo 
sessionInfo) {
+        super(sessionInfo);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
index 79171de88c..bcce446cd1 100644
--- 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
+++ 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
@@ -77,6 +77,9 @@ public class WebSocketMessageRouter {
     }
 
     public void onWebSocketClose(final String sessionId, final int statusCode, 
final String reason) {
+        if (processor instanceof ConnectedListener connectedListener) {
+            connectedListener.disconnected(getSessionOrFail(sessionId));
+        }
         sessions.remove(sessionId);
     }
 

Reply via email to