This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7e4ca9365f NIFI-12092 Add backoff parameters to JettyWebsocketClient
reconnect
7e4ca9365f is described below
commit 7e4ca9365fd781254597fa6c825bfb8223328c33
Author: Lehel Boer <[email protected]>
AuthorDate: Wed Sep 20 01:55:24 2023 +0300
NIFI-12092 Add backoff parameters to JettyWebsocketClient reconnect
This closes #7761.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../nifi/websocket/jetty/JettyWebSocketClient.java | 67 ++++++++++++++++------
1 file changed, 48 insertions(+), 19 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index fb224bbbea..df1c6913ed 100644
---
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.websocket.jetty;
-import org.apache.nifi.websocket.jetty.dto.SessionInfo;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -36,6 +35,8 @@ import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
+import org.apache.nifi.websocket.jetty.dto.SessionInfo;
+import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
@@ -46,7 +47,6 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
-import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@@ -65,6 +65,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"WebSocket", "Jetty", "client"})
@@ -165,7 +166,7 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
.displayName("Custom Authorization")
.description(
"Configures a custom HTTP Authorization Header as
described in RFC 7235 Section 4.2." +
- " Setting a custom Authorization Header excludes
configuring the User Name and User Password properties for Basic
Authentication.")
+ " Setting a custom Authorization Header excludes
configuring the User Name and User Password properties for Basic
Authentication.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -191,6 +192,8 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
+ private static final int INITIAL_BACKOFF_MILLIS = 100;
+ private static final int MAXIMUM_BACKOFF_MILLIS = 3200;
private static final List<PropertyDescriptor> properties;
static {
@@ -215,6 +218,7 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
private WebSocketClient client;
private URI webSocketUri;
private long connectionTimeoutMillis;
+ private int connectCount;
private volatile ScheduledExecutorService sessionMaintenanceScheduler;
private ConfigurationContext configurationContext;
protected String authorizationHeader;
@@ -229,6 +233,8 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
public void startClient(final ConfigurationContext context) throws
Exception {
configurationContext = context;
+ connectCount =
configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
+
final HttpClient httpClient;
final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
if (sslContextService == null) {
@@ -370,30 +376,49 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
request.setHeader(HttpHeader.AUTHORIZATION.asString(),
authorizationHeader);
}
- final int connectCount =
configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
+ final Session session = attemptConnection(listener, request,
connectCount);
- Session session = null;
- for (int i = 0; i < connectCount; i++) {
- final Future<Session> connect =
createWebsocketSession(listener, request);
- getLogger().info("Connecting to : {}", webSocketUri);
- try {
- session = connect.get(connectionTimeoutMillis,
TimeUnit.MILLISECONDS);
- break;
- } catch (Exception e) {
- if (i == connectCount - 1) {
- throw new IOException("Failed to connect " +
webSocketUri + " due to: " + e, e);
- } else {
- getLogger().warn("Failed to connect to {},
reconnection attempt {}", webSocketUri, i + 1);
- }
- }
- }
getLogger().info("Connected, session={}", session);
activeSessions.put(clientId, new
SessionInfo(listener.getSessionId(), flowFileAttributes));
} finally {
connectionLock.unlock();
}
+ }
+ private Session attemptConnection(RoutingWebSocketListener listener,
ClientUpgradeRequest request, int connectCount) throws IOException {
+ int backoffMillis = INITIAL_BACKOFF_MILLIS;
+ int backoffJitterMillis;
+ for (int i = 0; i < connectCount; i++) {
+ backoffJitterMillis = (int) (INITIAL_BACKOFF_MILLIS *
getBackoffJitter(-0.2, 0.2));
+ final Future<Session> connect = createWebsocketSession(listener,
request);
+ getLogger().info("Connecting to : {}", webSocketUri);
+ try {
+ return connect.get(connectionTimeoutMillis,
TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ getLogger().warn("Connection attempt to {} timed out",
webSocketUri);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final String errorMessage = String.format("Thread was
interrupted while attempting to connect to %s", webSocketUri);
+ throw new ProcessException(errorMessage, e);
+ } catch (Exception e) {
+ getLogger().warn("Failed to connect to {}, reconnection
attempt {}", webSocketUri, i + 1, e);
+ }
+
+ if (i < connectCount - 1) {
+ final int sleepTime = backoffMillis + backoffJitterMillis;
+ try {
+ getLogger().info("Sleeping {} ms before new connection
attempt.", sleepTime);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final String errorMessage = String.format("Thread was
interrupted while reconnecting to %s with %s backoffMillis", webSocketUri,
sleepTime);
+ throw new ProcessException(errorMessage, e);
+ }
+ backoffMillis = Math.min(backoffMillis * 2,
MAXIMUM_BACKOFF_MILLIS);
+ }
+ }
+ throw new IOException("Failed to connect " + webSocketUri + " after "
+ connectCount + " attempts");
}
Future<Session> createWebsocketSession(RoutingWebSocketListener listener,
ClientUpgradeRequest request) throws IOException {
@@ -453,4 +478,8 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
policy.setMaxTextMessageSize(maxTextMessageSize);
policy.setMaxBinaryMessageSize(maxBinaryMessageSize);
}
+
+ public double getBackoffJitter(final double min, final double max) {
+ return Math.random() * (max - min) + min;
+ }
}