[ 
https://issues.apache.org/jira/browse/NIFI-3609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992635#comment-15992635
 ] 

ASF GitHub Bot commented on NIFI-3609:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1597#discussion_r114281896
  
    --- Diff: 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
 ---
    @@ -135,27 +177,81 @@ public void stopClient() throws Exception {
     
         @Override
         public void connect(final String clientId) throws IOException {
    +        connect(clientId, null);
    +    }
    +
    +    private void connect(final String clientId, String sessionId) throws 
IOException {
    +
    +        connectionLock.lock();
     
    -        final WebSocketMessageRouter router;
             try {
    -            router = routers.getRouterOrFail(clientId);
    -        } catch (WebSocketConfigurationException e) {
    -            throw new IllegalStateException("Failed to get router due to: 
"  + e, e);
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = routers.getRouterOrFail(clientId);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due 
to: "  + e, e);
    +            }
    +            final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
    +            listener.setSessionId(sessionId);
    +
    +            final ClientUpgradeRequest request = new 
ClientUpgradeRequest();
    +            final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    +            getLogger().info("Connecting to : {}", new 
Object[]{webSocketUri});
    +
    +            final Session session;
    +            try {
    +                session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    +            } catch (Exception e) {
    +                throw new IOException("Failed to connect " + webSocketUri 
+ " due to: " + e, e);
    +            }
    +            getLogger().info("Connected, session={}", new 
Object[]{session});
    +            activeSessions.put(clientId, listener.getSessionId());
    +
    +        } finally {
    +            connectionLock.unlock();
             }
    -        final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
     
    -        final ClientUpgradeRequest request = new ClientUpgradeRequest();
    -        final Future<Session> connect = client.connect(listener, 
webSocketUri, request);
    -        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
    +    }
    +
    +    private Map<String, String> activeSessions = new ConcurrentHashMap<>();
     
    -        final Session session;
    +    void maintainSessions() throws Exception {
    +        if (client == null) {
    +            return;
    +        }
    +
    +        connectionLock.lock();
    +
    +        final ComponentLog logger = getLogger();
             try {
    -            session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
    -        } catch (Exception e) {
    -            throw new IOException("Failed to connect " + webSocketUri + " 
due to: " + e, e);
    +            // Loop through existing sessions and reconnect.
    +            for (String clientId : activeSessions.keySet()) {
    +                final WebSocketMessageRouter router;
    +                try {
    +                    router = routers.getRouterOrFail(clientId);
    +                } catch (final WebSocketConfigurationException e) {
    +                    if (logger.isDebugEnabled()) {
    +                        logger.debug("The clientId {} is no longer active. 
Discarding the clientId.", new Object[]{clientId});
    +                    }
    +                    activeSessions.remove(clientId);
    +                    continue;
    +                }
    +
    +                final String sessionId = activeSessions.get(clientId);
    +                // If this session is stil alive, do nothing.
    +                if (!router.containsSession(sessionId)) {
    +                    // This session is no longer active, reconnect it.
    +                    // If it fails, the sessionId will remain in 
activeSessions, and retries later.
    --- End diff --
    
    Good point. I think we should keep it running until user stops the 
processor or the controller service. The goal is making WebSocket connection 
successful. We don't know how long it takes for a connection to be recovered 
but as long as the processor running, we can assume user would like it to be 
connected again.
    
    During I was thinking and testing above scenarios, I found that 
ConnectWebSocket can not connect to a WebSocket server if the server is not 
running, and ConnectWebSocket stays 'STARTED' state but it doesn't retry 
connecting. I added de-registering call when initial connecting attempt fails 
so that it can retry connecting when it is scheduled next time.
    
    Once it successfully established a connection and got a session id, the 
maintenance activity does its job.
    
    Does this sound reasonable?


> ConnectWebSocket should be able to reconnect automatically
> ----------------------------------------------------------
>
>                 Key: NIFI-3609
>                 URL: https://issues.apache.org/jira/browse/NIFI-3609
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.1.0
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>
> WebSocket connection can be extended by trigger PutWebSocket periodically 
> (typically with empty message), but it doesn't help when server goes down.
> Currently if a server restarted, ConnectWebSocket has to be restarted 
> manually to establish new connection.
> Ideally, this situation should be recovered automatically. If existing 
> session ids can be reused, it'd be even better as it recovers transparently.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to