[ 
https://issues.apache.org/jira/browse/NIFI-3609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990729#comment-15990729
 ] 

ASF GitHub Bot commented on NIFI-3609:
--------------------------------------

Github user jdye64 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114116475
  
    --- Diff: 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
 ---
    @@ -135,27 +177,81 @@ public void stopClient() throws Exception {
     
         @Override
         public void connect(final String clientId) throws IOException {
    +        connect(clientId, null);
    +    }
    +
    +    private void connect(final String clientId, String sessionId) throws 
IOException {
    +
    +        connectionLock.lock();
     
    -        final WebSocketMessageRouter router;
             try {
    -            router = routers.getRouterOrFail(clientId);
    -        } catch (WebSocketConfigurationException e) {
    -            throw new IllegalStateException("Failed to get router due to: 
"  + e, e);
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = routers.getRouterOrFail(clientId);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due 
to: "  + e, e);
    +            }
    +            final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
    +            listener.setSessionId(sessionId);
    +
    +            final ClientUpgradeRequest request = new 
ClientUpgradeRequest();
    +            final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    +            getLogger().info("Connecting to : {}", new 
Object[]{webSocketUri});
    +
    +            final Session session;
    +            try {
    +                session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    +            } catch (Exception e) {
    +                throw new IOException("Failed to connect " + webSocketUri 
+ " due to: " + e, e);
    +            }
    +            getLogger().info("Connected, session={}", new 
Object[]{session});
    +            activeSessions.put(clientId, listener.getSessionId());
    +
    +        } finally {
    +            connectionLock.unlock();
             }
    -        final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
     
    -        final ClientUpgradeRequest request = new ClientUpgradeRequest();
    -        final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    -        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +    }
    +
    +    private Map<String, String> activeSessions = new ConcurrentHashMap<>();
     
    -        final Session session;
    +    void maintainSessions() throws Exception {
    +        if (client == null) {
    +            return;
    +        }
    +
    +        connectionLock.lock();
    +
    +        final ComponentLog logger = getLogger();
             try {
    -            session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    -        } catch (Exception e) {
    -            throw new IOException("Failed to connect " + webSocketUri + " 
due to: " + e, e);
    +            // Loop through existing sessions and reconnect.
    +            for (String clientId : activeSessions.keySet()) {
    +                final WebSocketMessageRouter router;
    +                try {
    +                    router = routers.getRouterOrFail(clientId);
    +                } catch (final WebSocketConfigurationException e) {
    +                    if (logger.isDebugEnabled()) {
    +                        logger.debug("The clientId {} is no longer active. 
Discarding the clientId.", new Object[]{clientId});
    +                    }
    +                    activeSessions.remove(clientId);
    +                    continue;
    +                }
    +
    +                final String sessionId = activeSessions.get(clientId);
    +                // If this session is stil alive, do nothing.
    +                if (!router.containsSession(sessionId)) {
    +                    // This session is no longer active, reconnect it.
    +                    // If it fails, the sessionId will remain in 
activeSessions, and retries later.
    --- End diff --
    
    More a question but should we limit the number of times this is attempted 
before the sessionId is just removed? I could see an argument for either way 
but curious your thought here.


> ConnectWebSocket should be able to reconnect automatically
> ----------------------------------------------------------
>
>                 Key: NIFI-3609
>                 URL: https://issues.apache.org/jira/browse/NIFI-3609
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.1.0
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>
> WebSocket connection can be extended by trigger PutWebSocket periodically 
> (typically with empty message), but it doesn't help when server goes down.
> Currently if a server restarted, ConnectWebSocket has to be restarted 
> manually to establish new connection.
> Ideally, this situation should be recovered automatically. If existing 
> session ids can be reused, it'd be even better as it recovers transparently.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to