This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e4cb5a3615 NIFI-14013 Add disconnected Relationship to
ConnectWebSocket (#9533)
e4cb5a3615 is described below
commit e4cb5a36159165c6c0aea2150b2c251519ecb11f
Author: POliveNokia <[email protected]>
AuthorDate: Sat Dec 14 14:18:53 2024 +0000
NIFI-14013 Add disconnected Relationship to ConnectWebSocket (#9533)
Signed-off-by: David Handermann <[email protected]>
---
.../AbstractWebSocketGatewayProcessor.java | 17 +++++++
.../processors/websocket/TestConnectWebSocket.java | 53 +++++++++++++++++++++-
.../apache/nifi/websocket/ConnectedListener.java | 1 +
...ener.java => WebSocketDisconnectedMessage.java} | 47 ++++++++++---------
.../nifi/websocket/WebSocketMessageRouter.java | 3 ++
5 files changed, 96 insertions(+), 25 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 784745465c..55921de2b3 100644
---
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -35,6 +35,7 @@ import org.apache.nifi.websocket.TextMessageConsumer;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketConnectedMessage;
+import org.apache.nifi.websocket.WebSocketDisconnectedMessage;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketService;
import org.apache.nifi.websocket.WebSocketSessionInfo;
@@ -67,6 +68,12 @@ public abstract class AbstractWebSocketGatewayProcessor
extends AbstractSessionF
.description("The WebSocket session is established")
.build();
+ public static final Relationship REL_DISCONNECTED = new
Relationship.Builder()
+ .name("disconnected")
+ .description("The WebSocket session is disconnected")
+ .autoTerminateDefault(true)
+ .build();
+
public static final Relationship REL_MESSAGE_TEXT = new
Relationship.Builder()
.name("text message")
.description("The WebSocket text message output")
@@ -92,6 +99,7 @@ public abstract class AbstractWebSocketGatewayProcessor
extends AbstractSessionF
static Set<Relationship> getAbstractRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_CONNECTED);
+ relationships.add(REL_DISCONNECTED);
relationships.add(REL_MESSAGE_TEXT);
relationships.add(REL_MESSAGE_BINARY);
return relationships;
@@ -114,6 +122,13 @@ public abstract class AbstractWebSocketGatewayProcessor
extends AbstractSessionF
enqueueMessage(message);
}
+ @Override
+ public void disconnected(WebSocketSessionInfo sessionInfo) {
+ final WebSocketMessage message = new
WebSocketDisconnectedMessage(sessionInfo);
+ sessionInfo.setTransitUri(getTransitUri(sessionInfo));
+ enqueueMessage(message);
+ }
+
@Override
public void consume(WebSocketSessionInfo sessionInfo, String messageStr) {
final WebSocketMessage message = new WebSocketMessage(sessionInfo);
@@ -252,6 +267,8 @@ public abstract class AbstractWebSocketGatewayProcessor
extends AbstractSessionF
if (incomingMessage instanceof WebSocketConnectedMessage) {
session.transfer(messageFlowFile, REL_CONNECTED);
+ } else if (incomingMessage instanceof
WebSocketDisconnectedMessage) {
+ session.transfer(messageFlowFile, REL_DISCONNECTED);
} else {
switch (Objects.requireNonNull(messageType)) {
case TEXT:
diff --git
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
index 3cc237c614..170baa7082 100644
---
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
+++
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
@@ -91,6 +91,7 @@ class TestConnectWebSocket extends TestListenWebSocket {
processor.consume(webSocketSession, binaryMessage, 0,
binaryMessage.length);
processor.consume(webSocketSession, binaryMessage, 0,
binaryMessage.length);
processor.consume(webSocketSession, binaryMessage, 0,
binaryMessage.length);
+ processor.disconnected(webSocketSession);
return null;
}).when(service).connect(endpointId);
runner.addControllerService(serviceId, service);
@@ -119,7 +120,7 @@ class TestConnectWebSocket extends TestListenWebSocket {
binaryFlowFiles.forEach(ff -> assertFlowFile(webSocketSession,
serviceId, endpointId, ff, WebSocketMessage.Type.BINARY));
final List<ProvenanceEventRecord> provenanceEvents =
sharedSessionState.getProvenanceEvents();
- assertEquals(6, provenanceEvents.size());
+ assertEquals(7, provenanceEvents.size());
assertTrue(provenanceEvents.stream().allMatch(event ->
ProvenanceEventType.RECEIVE.equals(event.getEventType())));
}
@@ -201,6 +202,56 @@ class TestConnectWebSocket extends TestListenWebSocket {
runner.stop();
}
+ @Test
+ void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect()
throws InitializationException {
+ // Start websocket server
+ final TestRunner webSocketListener =
TestRunners.newTestRunner(ListenWebSocket.class);
+
+ final String serverId = "ws-server-service";
+ JettyWebSocketServer server = new JettyWebSocketServer();
+ webSocketListener.addControllerService(serverId, server);
+ webSocketListener.setProperty(server,
JettyWebSocketServer.LISTEN_PORT, "0");
+ webSocketListener.enableControllerService(server);
+
+
webSocketListener.setProperty(ListenWebSocket.PROP_WEBSOCKET_SERVER_SERVICE,
serverId);
+ webSocketListener.setProperty(ListenWebSocket.PROP_SERVER_URL_PATH,
"/test");
+
+ webSocketListener.run(1, false);
+ final int listeningPort = server.getListeningPort();
+
+ final TestRunner runner =
TestRunners.newTestRunner(ConnectWebSocket.class);
+
+ final String clientId = "ws-service";
+ final String endpointId = "client-1";
+
+ MockFlowFile flowFile = getFlowFile();
+ runner.enqueue(flowFile);
+
+ JettyWebSocketClient client = new JettyWebSocketClient();
+
+
+ runner.addControllerService(clientId, client);
+ runner.setProperty(client, JettyWebSocketClient.WS_URI,
String.format("ws://localhost:%s/${dynamicUrlPart}", listeningPort));
+ runner.enableControllerService(client);
+
+ runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_SERVICE,
clientId);
+ runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_ID,
endpointId);
+
+ runner.run(1, false);
+
+ final List<MockFlowFile> flowFilesForConnectedRelationship =
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
+ assertEquals(1, flowFilesForConnectedRelationship.size());
+
+ final List<MockFlowFile> flowFilesForSuccess =
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_SUCCESS);
+ assertEquals(1, flowFilesForSuccess.size());
+
+ webSocketListener.disableControllerService(server);
+
+ final List<MockFlowFile> flowFilesForDisconnectedRelationship =
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_DISCONNECTED);
+ assertEquals(1, flowFilesForDisconnectedRelationship.size());
+
+ runner.stop();
+ }
private MockFlowFile getFlowFile() {
Map<String, String> attributes = new HashMap<>();
diff --git
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
index b682094da6..124606c373 100644
---
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
+++
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
@@ -21,4 +21,5 @@ package org.apache.nifi.websocket;
*/
public interface ConnectedListener {
void connected(final WebSocketSessionInfo sessionInfo);
+ void disconnected(final WebSocketSessionInfo sessionInfo);
}
diff --git
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java
similarity index 80%
copy from
nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
copy to
nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java
index b682094da6..00abf1cd9a 100644
---
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
+++
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java
@@ -1,24 +1,23 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket;
-
-/**
- * To be performed when a WebSocket connection is established.
- */
-public interface ConnectedListener {
- void connected(final WebSocketSessionInfo sessionInfo);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+public class WebSocketDisconnectedMessage extends WebSocketMessage {
+ public WebSocketDisconnectedMessage(final WebSocketSessionInfo
sessionInfo) {
+ super(sessionInfo);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
index 79171de88c..bcce446cd1 100644
---
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
+++
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
@@ -77,6 +77,9 @@ public class WebSocketMessageRouter {
}
public void onWebSocketClose(final String sessionId, final int statusCode,
final String reason) {
+ if (processor instanceof ConnectedListener connectedListener) {
+ connectedListener.disconnected(getSessionOrFail(sessionId));
+ }
sessions.remove(sessionId);
}