Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114281896
  
    --- Diff: 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
 ---
    @@ -135,27 +177,81 @@ public void stopClient() throws Exception {
     
         @Override
         public void connect(final String clientId) throws IOException {
    +        connect(clientId, null);
    +    }
    +
    +    private void connect(final String clientId, String sessionId) throws 
IOException {
    +
    +        connectionLock.lock();
     
    -        final WebSocketMessageRouter router;
             try {
    -            router = routers.getRouterOrFail(clientId);
    -        } catch (WebSocketConfigurationException e) {
    -            throw new IllegalStateException("Failed to get router due to: 
"  + e, e);
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = routers.getRouterOrFail(clientId);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due 
to: "  + e, e);
    +            }
    +            final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
    +            listener.setSessionId(sessionId);
    +
    +            final ClientUpgradeRequest request = new 
ClientUpgradeRequest();
    +            final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    +            getLogger().info("Connecting to : {}", new 
Object[]{webSocketUri});
    +
    +            final Session session;
    +            try {
    +                session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    +            } catch (Exception e) {
    +                throw new IOException("Failed to connect " + webSocketUri 
+ " due to: " + e, e);
    +            }
    +            getLogger().info("Connected, session={}", new 
Object[]{session});
    +            activeSessions.put(clientId, listener.getSessionId());
    +
    +        } finally {
    +            connectionLock.unlock();
             }
    -        final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
     
    -        final ClientUpgradeRequest request = new ClientUpgradeRequest();
    -        final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    -        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +    }
    +
    +    private Map<String, String> activeSessions = new ConcurrentHashMap<>();
     
    -        final Session session;
    +    void maintainSessions() throws Exception {
    +        if (client == null) {
    +            return;
    +        }
    +
    +        connectionLock.lock();
    +
    +        final ComponentLog logger = getLogger();
             try {
    -            session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    -        } catch (Exception e) {
    -            throw new IOException("Failed to connect " + webSocketUri + " 
due to: " + e, e);
    +            // Loop through existing sessions and reconnect.
    +            for (String clientId : activeSessions.keySet()) {
    +                final WebSocketMessageRouter router;
    +                try {
    +                    router = routers.getRouterOrFail(clientId);
    +                } catch (final WebSocketConfigurationException e) {
    +                    if (logger.isDebugEnabled()) {
    +                        logger.debug("The clientId {} is no longer active. 
Discarding the clientId.", new Object[]{clientId});
    +                    }
    +                    activeSessions.remove(clientId);
    +                    continue;
    +                }
    +
    +                final String sessionId = activeSessions.get(clientId);
    +                // If this session is stil alive, do nothing.
    +                if (!router.containsSession(sessionId)) {
    +                    // This session is no longer active, reconnect it.
    +                    // If it fails, the sessionId will remain in 
activeSessions, and retries later.
    --- End diff --
    
    Good point. I think we should keep it running until user stops the 
processor or the controller service. The goal is making WebSocket connection 
successful. We don't know how long it takes for a connection to be recovered 
but as long as the processor running, we can assume user would like it to be 
connected again.
    
    During I was thinking and testing above scenarios, I found that 
ConnectWebSocket can not connect to a WebSocket server if the server is not 
running, and ConnectWebSocket stays 'STARTED' state but it doesn't retry 
connecting. I added de-registering call when initial connecting attempt fails 
so that it can retry connecting when it is scheduled next time.
    
    Once it successfully established a connection and got a session id, the 
maintenance activity does its job.
    
    Does this sound reasonable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to