Github user jdye64 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114116475
  
    --- Diff: 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
 ---
    @@ -135,27 +177,81 @@ public void stopClient() throws Exception {
     
         @Override
         public void connect(final String clientId) throws IOException {
    +        connect(clientId, null);
    +    }
    +
    +    private void connect(final String clientId, String sessionId) throws 
IOException {
    +
    +        connectionLock.lock();
     
    -        final WebSocketMessageRouter router;
             try {
    -            router = routers.getRouterOrFail(clientId);
    -        } catch (WebSocketConfigurationException e) {
    -            throw new IllegalStateException("Failed to get router due to: 
"  + e, e);
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = routers.getRouterOrFail(clientId);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due 
to: "  + e, e);
    +            }
    +            final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
    +            listener.setSessionId(sessionId);
    +
    +            final ClientUpgradeRequest request = new 
ClientUpgradeRequest();
    +            final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    +            getLogger().info("Connecting to : {}", new 
Object[]{webSocketUri});
    +
    +            final Session session;
    +            try {
    +                session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    +            } catch (Exception e) {
    +                throw new IOException("Failed to connect " + webSocketUri 
+ " due to: " + e, e);
    +            }
    +            getLogger().info("Connected, session={}", new 
Object[]{session});
    +            activeSessions.put(clientId, listener.getSessionId());
    +
    +        } finally {
    +            connectionLock.unlock();
             }
    -        final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
     
    -        final ClientUpgradeRequest request = new ClientUpgradeRequest();
    -        final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    -        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +    }
    +
    +    private Map<String, String> activeSessions = new ConcurrentHashMap<>();
     
    -        final Session session;
    +    void maintainSessions() throws Exception {
    +        if (client == null) {
    +            return;
    +        }
    +
    +        connectionLock.lock();
    +
    +        final ComponentLog logger = getLogger();
             try {
    -            session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    -        } catch (Exception e) {
    -            throw new IOException("Failed to connect " + webSocketUri + " 
due to: " + e, e);
    +            // Loop through existing sessions and reconnect.
    +            for (String clientId : activeSessions.keySet()) {
    +                final WebSocketMessageRouter router;
    +                try {
    +                    router = routers.getRouterOrFail(clientId);
    +                } catch (final WebSocketConfigurationException e) {
    +                    if (logger.isDebugEnabled()) {
    +                        logger.debug("The clientId {} is no longer active. 
Discarding the clientId.", new Object[]{clientId});
    +                    }
    +                    activeSessions.remove(clientId);
    +                    continue;
    +                }
    +
    +                final String sessionId = activeSessions.get(clientId);
    +                // If this session is stil alive, do nothing.
    +                if (!router.containsSession(sessionId)) {
    +                    // This session is no longer active, reconnect it.
    +                    // If it fails, the sessionId will remain in 
activeSessions, and retries later.
    --- End diff --
    
    More a question but should we limit the number of times this is attempted 
before the sessionId is just removed? I could see an argument for either way 
but curious your thought here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to