This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 8aef8338f9 NIFI-12092 Add backoff parameters to JettyWebsocketClient
reconnect
8aef8338f9 is described below
commit 8aef8338f927ce4acfee72018abbbe000201423f
Author: Lehel Boer <[email protected]>
AuthorDate: Wed Sep 20 00:55:24 2023 +0200
NIFI-12092 Add backoff parameters to JettyWebsocketClient reconnect
This closes #7761.
Signed-off-by: Tamas Palfy <[email protected]>
(cherry picked from commit 7e4ca9365fd781254597fa6c825bfb8223328c33)
---
.../nifi/websocket/jetty/JettyWebSocketClient.java | 66 ++++++++++++++++------
1 file changed, 48 insertions(+), 18 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index 4ccf2f67cf..963afb97b8 100644
---
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.websocket.jetty;
-import org.apache.nifi.websocket.jetty.dto.SessionInfo;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -35,6 +34,8 @@ import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
+import org.apache.nifi.websocket.jetty.dto.SessionInfo;
+import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.http.HttpHeader;
@@ -42,7 +43,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
-import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import java.io.IOException;
import java.net.URI;
@@ -60,6 +60,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"WebSocket", "Jetty", "client"})
@@ -186,6 +187,8 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
+ private static final int INITIAL_BACKOFF_MILLIS = 100;
+ private static final int MAXIMUM_BACKOFF_MILLIS = 3200;
private static final List<PropertyDescriptor> properties;
static {
@@ -210,6 +213,7 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
private WebSocketClient client;
private URI webSocketUri;
private long connectionTimeoutMillis;
+ private int connectCount;
private volatile ScheduledExecutorService sessionMaintenanceScheduler;
private ConfigurationContext configurationContext;
protected String authorizationHeader;
@@ -223,6 +227,9 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
@Override
public void startClient(final ConfigurationContext context) throws
Exception {
configurationContext = context;
+
+ connectCount =
configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
+
final SSLContextService sslService =
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
SslContextFactory sslContextFactory = null;
if (sslService != null) {
@@ -359,30 +366,49 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
request.setHeader(HttpHeader.AUTHORIZATION.asString(),
authorizationHeader);
}
- final int connectCount =
configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
+ final Session session = attemptConnection(listener, request,
connectCount);
- Session session = null;
- for (int i = 0; i < connectCount; i++) {
- final Future<Session> connect =
createWebsocketSession(listener, request);
- getLogger().info("Connecting to : {}", webSocketUri);
- try {
- session = connect.get(connectionTimeoutMillis,
TimeUnit.MILLISECONDS);
- break;
- } catch (Exception e) {
- if (i == connectCount - 1) {
- throw new IOException("Failed to connect " +
webSocketUri + " due to: " + e, e);
- } else {
- getLogger().warn("Failed to connect to {},
reconnection attempt {}", webSocketUri, i + 1);
- }
- }
- }
getLogger().info("Connected, session={}", session);
activeSessions.put(clientId, new
SessionInfo(listener.getSessionId(), flowFileAttributes));
} finally {
connectionLock.unlock();
}
+ }
+ private Session attemptConnection(RoutingWebSocketListener listener,
ClientUpgradeRequest request, int connectCount) throws IOException {
+ int backoffMillis = INITIAL_BACKOFF_MILLIS;
+ int backoffJitterMillis;
+ for (int i = 0; i < connectCount; i++) {
+ backoffJitterMillis = (int) (INITIAL_BACKOFF_MILLIS *
getBackoffJitter(-0.2, 0.2));
+ final Future<Session> connect = createWebsocketSession(listener,
request);
+ getLogger().info("Connecting to : {}", webSocketUri);
+ try {
+ return connect.get(connectionTimeoutMillis,
TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ getLogger().warn("Connection attempt to {} timed out",
webSocketUri);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final String errorMessage = String.format("Thread was
interrupted while attempting to connect to %s", webSocketUri);
+ throw new ProcessException(errorMessage, e);
+ } catch (Exception e) {
+ getLogger().warn("Failed to connect to {}, reconnection
attempt {}", webSocketUri, i + 1, e);
+ }
+
+ if (i < connectCount - 1) {
+ final int sleepTime = backoffMillis + backoffJitterMillis;
+ try {
+ getLogger().info("Sleeping {} ms before new connection
attempt.", sleepTime);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final String errorMessage = String.format("Thread was
interrupted while reconnecting to %s with %s backoffMillis", webSocketUri,
sleepTime);
+ throw new ProcessException(errorMessage, e);
+ }
+ backoffMillis = Math.min(backoffMillis * 2,
MAXIMUM_BACKOFF_MILLIS);
+ }
+ }
+ throw new IOException("Failed to connect " + webSocketUri + " after "
+ connectCount + " attempts");
}
Future<Session> createWebsocketSession(RoutingWebSocketListener listener,
ClientUpgradeRequest request) throws IOException {
@@ -433,4 +459,8 @@ public class JettyWebSocketClient extends
AbstractJettyWebSocketService implemen
public String getTargetUri() {
return webSocketUri.toString();
}
+
+ public double getBackoffJitter(final double min, final double max) {
+ return Math.random() * (max - min) + min;
+ }
}