[
https://issues.apache.org/jira/browse/NIFI-3609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990729#comment-15990729
]
ASF GitHub Bot commented on NIFI-3609:
--------------------------------------
Github user jdye64 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1597#discussion_r114116475
--- Diff:
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
---
@@ -135,27 +177,81 @@ public void stopClient() throws Exception {
@Override
public void connect(final String clientId) throws IOException {
+ connect(clientId, null);
+ }
+
+ private void connect(final String clientId, String sessionId) throws
IOException {
+
+ connectionLock.lock();
- final WebSocketMessageRouter router;
try {
- router = routers.getRouterOrFail(clientId);
- } catch (WebSocketConfigurationException e) {
- throw new IllegalStateException("Failed to get router due to:
" + e, e);
+ final WebSocketMessageRouter router;
+ try {
+ router = routers.getRouterOrFail(clientId);
+ } catch (WebSocketConfigurationException e) {
+ throw new IllegalStateException("Failed to get router due
to: " + e, e);
+ }
+ final RoutingWebSocketListener listener = new
RoutingWebSocketListener(router);
+ listener.setSessionId(sessionId);
+
+ final ClientUpgradeRequest request = new
ClientUpgradeRequest();
+ final Future<Session> connect = client.connect(listener,
webSocketUri, request);
+ getLogger().info("Connecting to : {}", new
Object[]{webSocketUri});
+
+ final Session session;
+ try {
+ session = connect.get(connectionTimeoutMillis,
TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new IOException("Failed to connect " + webSocketUri
+ " due to: " + e, e);
+ }
+ getLogger().info("Connected, session={}", new
Object[]{session});
+ activeSessions.put(clientId, listener.getSessionId());
+
+ } finally {
+ connectionLock.unlock();
}
- final RoutingWebSocketListener listener = new
RoutingWebSocketListener(router);
- final ClientUpgradeRequest request = new ClientUpgradeRequest();
- final Future<Session> connect = client.connect(listener,
webSocketUri, request);
- getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
+ }
+
+ private Map<String, String> activeSessions = new ConcurrentHashMap<>();
- final Session session;
+ void maintainSessions() throws Exception {
+ if (client == null) {
+ return;
+ }
+
+ connectionLock.lock();
+
+ final ComponentLog logger = getLogger();
try {
- session = connect.get(connectionTimeoutMillis,
TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- throw new IOException("Failed to connect " + webSocketUri + "
due to: " + e, e);
+ // Loop through existing sessions and reconnect.
+ for (String clientId : activeSessions.keySet()) {
+ final WebSocketMessageRouter router;
+ try {
+ router = routers.getRouterOrFail(clientId);
+ } catch (final WebSocketConfigurationException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The clientId {} is no longer active.
Discarding the clientId.", new Object[]{clientId});
+ }
+ activeSessions.remove(clientId);
+ continue;
+ }
+
+ final String sessionId = activeSessions.get(clientId);
+ // If this session is stil alive, do nothing.
+ if (!router.containsSession(sessionId)) {
+ // This session is no longer active, reconnect it.
+ // If it fails, the sessionId will remain in
activeSessions, and retries later.
--- End diff --
More a question but should we limit the number of times this is attempted
before the sessionId is just removed? I could see an argument for either way
but curious your thought here.
> ConnectWebSocket should be able to reconnect automatically
> ----------------------------------------------------------
>
> Key: NIFI-3609
> URL: https://issues.apache.org/jira/browse/NIFI-3609
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 1.1.0
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
>
> WebSocket connection can be extended by trigger PutWebSocket periodically
> (typically with empty message), but it doesn't help when server goes down.
> Currently if a server restarted, ConnectWebSocket has to be restarted
> manually to establish new connection.
> Ideally, this situation should be recovered automatically. If existing
> session ids can be reused, it'd be even better as it recovers transparently.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)