tpalfy commented on a change in pull request #5130:
URL: https://github.com/apache/nifi/pull/5130#discussion_r656314776
##########
File path:
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
##########
@@ -118,12 +121,20 @@ public void consume(WebSocketSessionInfo sessionInfo,
byte[] payload, int offset
// @OnScheduled can not report error messages well on bulletin since it's
an async method.
// So, let's do it in onTrigger().
- public void onWebSocketServiceReady(final WebSocketService
webSocketService) throws IOException {
-
+ public void onWebSocketServiceReady(final WebSocketService
webSocketService, final ProcessContext context) throws IOException {
if (webSocketService instanceof WebSocketClientService) {
// If it's a ws client, then connect to the remote here.
// Otherwise, ws server is already started at
WebSocketServerService
- ((WebSocketClientService) webSocketService).connect(endpointId);
+ WebSocketClientService webSocketClientService =
(WebSocketClientService) webSocketService;
+ if (context.hasIncomingConnection()) {
+ final ProcessSession session =
processSessionFactory.createSession();
+ final FlowFile flowFile = session.get();
+ final Map<String, String> attributes =
flowFile.getAttributes();
+ session.remove(flowFile);
+ webSocketClientService.connect(endpointId, attributes);
Review comment:
```suggestion
try {
webSocketClientService.connect(endpointId, attributes);
session.remove(flowFile);
session.commitAsync();
} catch (IOException | RuntimeException e) {
session.rollback(true);
throw e;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]