This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 46bb7d153d NIFI-9558: ConnectWebSocket leaks connections and
duplicates FlowFiles in incoming connection mode (new PR)
46bb7d153d is described below
commit 46bb7d153d5b17db77fc70042396973cbfa561a9
Author: Lehel <[email protected]>
AuthorDate: Thu Jan 13 00:15:00 2022 +0100
NIFI-9558: ConnectWebSocket leaks connections and duplicates FlowFiles in
incoming connection mode (new PR)
This closes #6176.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../AbstractWebSocketGatewayProcessor.java | 33 +++++++++++-----------
.../processors/websocket/ConnectWebSocket.java | 4 ++-
.../additionalDetails.html | 3 +-
.../processors/websocket/TestConnectWebSocket.java | 8 ++----
4 files changed, 23 insertions(+), 25 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 33129bfbc0..9824c4a6d2 100644
---
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -185,28 +185,27 @@ public abstract class AbstractWebSocketGatewayProcessor
extends AbstractSessionF
}
if (!isProcessorRegisteredToService()) {
- try {
- registerProcessorToService(context, webSocketService ->
onWebSocketServiceReady(webSocketService, context));
- } catch (IOException | WebSocketConfigurationException e) {
- // Deregister processor if it failed so that it can retry next
onTrigger.
- deregister();
- context.yield();
- throw new ProcessException("Failed to register processor to
WebSocket service due to: " + e, e);
- }
-
- } else if (context.hasIncomingConnection()) {
- try {
- onWebSocketServiceReady(webSocketService, context);
- } catch (IOException e) {
- deregister();
- context.yield();
- throw new ProcessException("Failed to renew session and
connect to WebSocket service due to: " + e, e);
- }
+ register(context);
+ } else if (webSocketService instanceof WebSocketClientService &&
context.hasIncomingConnection()) {
+ // Deregister processor to close previous sessions when flowfile
is provided.
+ deregister();
+ register(context);
}
context.yield();//nothing really to do here since handling WebSocket
messages is done at ControllerService.
}
+ private void register(ProcessContext context) {
+ try {
+ registerProcessorToService(context, webSocketService ->
onWebSocketServiceReady(webSocketService, context));
+ } catch (IOException | WebSocketConfigurationException e) {
+ // Deregister processor if it failed so that it can retry next
onTrigger.
+ deregister();
+ context.yield();
+ throw new ProcessException("Failed to register processor to
WebSocket service due to: " + e, e);
+ }
+ }
+
private void enqueueMessage(final WebSocketMessage incomingMessage) {
final ProcessSession session = processSessionFactory.createSession();
diff --git
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
index dfbd9c482c..be53854d7b 100644
---
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
+++
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
@@ -48,7 +48,9 @@ import static
org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.
@TriggerSerially
@CapabilityDescription("Acts as a WebSocket client endpoint to interact with a
remote WebSocket server." +
" FlowFiles are transferred to downstream relationships according to
received message types" +
- " as WebSocket client configured with this processor receives messages
from remote WebSocket server.")
+ " as WebSocket client configured with this processor receives messages
from remote WebSocket server." +
+ " If a new flowfile is passed to the processor, the previous sessions
will be closed and any data being" +
+ " sent will be aborted.")
@WritesAttributes({
@WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket
Controller Service id."),
@WritesAttribute(attribute = ATTR_WS_SESSION_ID, description =
"Established WebSocket session id."),
diff --git
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
index 8c4c2e8135..ecb7c0a176 100644
---
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/resources/docs/org.apache.nifi.processors.websocket.ConnectWebSocket/additionalDetails.html
@@ -35,7 +35,8 @@
<p>
You can define custom websocket headers in the incoming flowfile as
additional attributes. The attribute key
shall start with "header." and continue with they header key. For example:
"header.Authorization". The attribute
- value will be the corresponding header value.
+ value will be the corresponding header value. If a new flowfile is passed
to the processor, the previous sessions will be closed,
+ and any data being sent will be aborted.
<ol>
<li>header.Autorization | Basic base64UserNamePassWord</li>
<li>header.Content-Type | application, audio, example</li>
diff --git
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
index dbefe9befc..01c7b8079b 100644
---
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
+++
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
@@ -44,7 +44,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -52,7 +51,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-public class TestConnectWebSocket extends TestListenWebSocket {
+class TestConnectWebSocket extends TestListenWebSocket {
@Test
public void testSuccess() throws Exception {
@@ -126,7 +125,7 @@ public class TestConnectWebSocket extends
TestListenWebSocket {
}
@Test
- public void testDynamicUrlsParsedFromFlowFileAndAbleToConnect() throws
InitializationException {
+ void testDynamicUrlsParsedFromFlowFileAndAbleToConnect() throws
InitializationException {
// Start websocket server
final int port = NetworkUtils.availablePort();
TestRunner webSocketListener = getListenWebSocket(port);
@@ -163,9 +162,6 @@ public class TestConnectWebSocket extends
TestListenWebSocket {
final List<MockFlowFile> flowFilesForRelationship =
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
assertEquals(1, flowFilesForRelationship.size());
- final AssertionError assertionError =
assertThrows(AssertionError.class, () -> runner.run(1));
-
assertTrue(assertionError.getCause().getLocalizedMessage().contains("Failed to
renew session and connect to WebSocket service"));
-
runner.stop();
webSocketListener.stop();
}