This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e4ca9365f NIFI-12092 Add backoff parameters to JettyWebsocketClient 
reconnect
7e4ca9365f is described below

commit 7e4ca9365fd781254597fa6c825bfb8223328c33
Author: Lehel Boer <[email protected]>
AuthorDate: Wed Sep 20 01:55:24 2023 +0300

    NIFI-12092 Add backoff parameters to JettyWebsocketClient reconnect
    
    This closes #7761.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../nifi/websocket/jetty/JettyWebSocketClient.java | 67 ++++++++++++++++------
 1 file changed, 48 insertions(+), 19 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index fb224bbbea..df1c6913ed 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.websocket.jetty;
 
-import org.apache.nifi.websocket.jetty.dto.SessionInfo;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -36,6 +35,8 @@ import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.websocket.WebSocketClientService;
 import org.apache.nifi.websocket.WebSocketConfigurationException;
 import org.apache.nifi.websocket.WebSocketMessageRouter;
+import org.apache.nifi.websocket.jetty.dto.SessionInfo;
+import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.HttpProxy;
 import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
@@ -46,7 +47,6 @@ import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
 import org.eclipse.jetty.websocket.client.WebSocketClient;
-import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
@@ -65,6 +65,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 
 @Tags({"WebSocket", "Jetty", "client"})
@@ -165,7 +166,7 @@ public class JettyWebSocketClient extends 
AbstractJettyWebSocketService implemen
             .displayName("Custom Authorization")
             .description(
                     "Configures a custom HTTP Authorization Header as 
described in RFC 7235 Section 4.2." +
-                    " Setting a custom Authorization Header excludes 
configuring the User Name and User Password properties for Basic 
Authentication.")
+                            " Setting a custom Authorization Header excludes 
configuring the User Name and User Password properties for Basic 
Authentication.")
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -191,6 +192,8 @@ public class JettyWebSocketClient extends 
AbstractJettyWebSocketService implemen
             .addValidator(StandardValidators.PORT_VALIDATOR)
             .build();
 
+    private static final int INITIAL_BACKOFF_MILLIS = 100;
+    private static final int MAXIMUM_BACKOFF_MILLIS = 3200;
     private static final List<PropertyDescriptor> properties;
 
     static {
@@ -215,6 +218,7 @@ public class JettyWebSocketClient extends 
AbstractJettyWebSocketService implemen
     private WebSocketClient client;
     private URI webSocketUri;
     private long connectionTimeoutMillis;
+    private int connectCount;
     private volatile ScheduledExecutorService sessionMaintenanceScheduler;
     private ConfigurationContext configurationContext;
     protected String authorizationHeader;
@@ -229,6 +233,8 @@ public class JettyWebSocketClient extends 
AbstractJettyWebSocketService implemen
     public void startClient(final ConfigurationContext context) throws 
Exception {
         configurationContext = context;
 
+        connectCount = 
configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
+
         final HttpClient httpClient;
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
         if (sslContextService == null) {
@@ -370,30 +376,49 @@ public class JettyWebSocketClient extends 
AbstractJettyWebSocketService implemen
                 request.setHeader(HttpHeader.AUTHORIZATION.asString(), 
authorizationHeader);
             }
 
-            final int connectCount = 
configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
+            final Session session = attemptConnection(listener, request, 
connectCount);
 
-            Session session = null;
-            for (int i = 0; i < connectCount; i++) {
-                final Future<Session> connect = 
createWebsocketSession(listener, request);
-                getLogger().info("Connecting to : {}", webSocketUri);
-                try {
-                    session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
-                    break;
-                } catch (Exception e) {
-                    if (i == connectCount - 1) {
-                        throw new IOException("Failed to connect " + 
webSocketUri + " due to: " + e, e);
-                    } else {
-                        getLogger().warn("Failed to connect to {}, 
reconnection attempt {}", webSocketUri, i + 1);
-                    }
-                }
-            }
             getLogger().info("Connected, session={}", session);
             activeSessions.put(clientId, new 
SessionInfo(listener.getSessionId(), flowFileAttributes));
 
         } finally {
             connectionLock.unlock();
         }
+    }
 
+    private Session attemptConnection(RoutingWebSocketListener listener, 
ClientUpgradeRequest request, int connectCount) throws IOException {
+        int backoffMillis = INITIAL_BACKOFF_MILLIS;
+        int backoffJitterMillis;
+        for (int i = 0; i < connectCount; i++) {
+            backoffJitterMillis = (int) (INITIAL_BACKOFF_MILLIS * 
getBackoffJitter(-0.2, 0.2));
+            final Future<Session> connect = createWebsocketSession(listener, 
request);
+            getLogger().info("Connecting to : {}", webSocketUri);
+            try {
+                return connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                getLogger().warn("Connection attempt to {} timed out", 
webSocketUri);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                final String errorMessage = String.format("Thread was 
interrupted while attempting to connect to %s", webSocketUri);
+                throw new ProcessException(errorMessage, e);
+            } catch (Exception e) {
+                getLogger().warn("Failed to connect to {}, reconnection 
attempt {}", webSocketUri, i + 1, e);
+            }
+
+            if (i < connectCount - 1) {
+                final int sleepTime = backoffMillis + backoffJitterMillis;
+                try {
+                    getLogger().info("Sleeping {} ms before new connection 
attempt.", sleepTime);
+                    Thread.sleep(sleepTime);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    final String errorMessage = String.format("Thread was 
interrupted while reconnecting to %s with %s backoffMillis", webSocketUri, 
sleepTime);
+                    throw new ProcessException(errorMessage, e);
+                }
+                backoffMillis = Math.min(backoffMillis * 2, 
MAXIMUM_BACKOFF_MILLIS);
+            }
+        }
+        throw new IOException("Failed to connect " + webSocketUri + " after " 
+ connectCount + " attempts");
     }
 
     Future<Session> createWebsocketSession(RoutingWebSocketListener listener, 
ClientUpgradeRequest request) throws IOException {
@@ -453,4 +478,8 @@ public class JettyWebSocketClient extends 
AbstractJettyWebSocketService implemen
         policy.setMaxTextMessageSize(maxTextMessageSize);
         policy.setMaxBinaryMessageSize(maxBinaryMessageSize);
     }
+
+    public double getBackoffJitter(final double min, final double max) {
+        return Math.random() * (max - min) + min;
+    }
 }

Reply via email to