szetszwo commented on code in PR #1349:
URL: https://github.com/apache/ratis/pull/1349#discussion_r2818446822
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -136,20 +141,38 @@ void shutdownGracefully() {
}
static class Connection {
- static final TimeDuration RECONNECT = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
-
private final InetSocketAddress address;
private final WorkerGroupGetter workerGroup;
private final Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier;
+ private final long minReconnectMillis;
+ private final long maxReconnectMillis;
+ private final int maxReconnectAttempts;
+ private final RetryPolicy reconnectPolicy;
/** The {@link ChannelFuture} is null when this connection is closed. */
private final AtomicReference<MemoizedSupplier<ChannelFuture>> ref;
+ private final AtomicBoolean reconnectScheduled = new AtomicBoolean(false);
+ private final AtomicInteger reconnectAttempts = new AtomicInteger();
Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
- Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier) {
+ Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier,
+ TimeDuration reconnectDelay, TimeDuration reconnectMaxDelay, int
reconnectMaxAttempts) {
this.address = address;
this.workerGroup = workerGroup;
this.channelInitializerSupplier = channelInitializerSupplier;
+ this.minReconnectMillis =
reconnectDelay.getUnit().toMillis(reconnectDelay.getDuration());
+ this.maxReconnectMillis =
reconnectMaxDelay.getUnit().toMillis(reconnectMaxDelay.getDuration());
+ this.maxReconnectAttempts = reconnectMaxAttempts;
+ Preconditions.assertTrue(minReconnectMillis > 0, () ->
"minReconnectMillis = " + minReconnectMillis + " <= 0");
+ Preconditions.assertTrue(maxReconnectMillis >= minReconnectMillis,
+ () -> "maxReconnectMillis = " + maxReconnectMillis + " <
minReconnectMillis = " + minReconnectMillis);
+ Preconditions.assertTrue(maxReconnectAttempts >= 0,
+ () -> "maxReconnectAttempts = " + maxReconnectAttempts + " < 0");
Review Comment:
Let's move the preconditions to the ExponentialBackoffRetry constructor:
```java
private ExponentialBackoffRetry(TimeDuration baseSleepTime, TimeDuration
maxSleepTime, int maxAttempts) {
Objects.requireNonNull(baseSleepTime, "baseSleepTime == null");
Preconditions.assertTrue(baseSleepTime.isPositive(), () ->
"baseSleepTime = " + baseSleepTime + " <= 0");
Objects.requireNonNull(maxSleepTime, "maxSleepTime == null");
Preconditions.assertTrue(maxSleepTime.compareTo(baseSleepTime) >= 0,
() -> "maxSleepTime = " + maxSleepTime + " < baseSleepTime = " +
baseSleepTime);
Preconditions.assertTrue(maxAttempts >= 0, () -> "maxAttempts = " +
maxAttempts + " < 0");
this.baseSleepTime = baseSleepTime;
this.maxSleepTime = maxSleepTime;
this.maxAttempts = maxAttempts;
}
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java:
##########
@@ -176,6 +176,47 @@ static TimeDuration replyQueueGracePeriod(RaftProperties
properties) {
static void setReplyQueueGracePeriod(RaftProperties properties,
TimeDuration timeoutDuration) {
setTimeDuration(properties::setTimeDuration,
REPLY_QUEUE_GRACE_PERIOD_KEY, timeoutDuration);
}
+
+ /**
+ * Initial delay for reconnect attempts.
+ * The delay doubles on each failure with 0.5x-1.5x jitter.
+ */
+ String RECONNECT_DELAY_KEY = PREFIX + ".reconnect.delay";
+ TimeDuration RECONNECT_DELAY_DEFAULT = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
+ static TimeDuration reconnectDelay(RaftProperties properties) {
+ return
getTimeDuration(properties.getTimeDuration(RECONNECT_DELAY_DEFAULT.getUnit()),
+ RECONNECT_DELAY_KEY, RECONNECT_DELAY_DEFAULT, getDefaultLog());
+ }
+ static void setReconnectDelay(RaftProperties properties, TimeDuration
delay) {
+ setTimeDuration(properties::setTimeDuration, RECONNECT_DELAY_KEY,
delay);
+ }
+
+ /**
+ * Maximum delay for reconnect attempts.
+ * The backoff increases until this upper bound.
+ */
+ String RECONNECT_MAX_DELAY_KEY = PREFIX + ".reconnect.max-delay";
+ TimeDuration RECONNECT_MAX_DELAY_DEFAULT = TimeDuration.valueOf(5,
TimeUnit.SECONDS);
+ static TimeDuration reconnectMaxDelay(RaftProperties properties) {
+ return
getTimeDuration(properties.getTimeDuration(RECONNECT_MAX_DELAY_DEFAULT.getUnit()),
+ RECONNECT_MAX_DELAY_KEY, RECONNECT_MAX_DELAY_DEFAULT,
getDefaultLog());
+ }
+ static void setReconnectMaxDelay(RaftProperties properties, TimeDuration
delay) {
+ setTimeDuration(properties::setTimeDuration, RECONNECT_MAX_DELAY_KEY,
delay);
+ }
+
+ /**
+ * Maximum number of reconnect attempts.
+ * Use {@link Integer#MAX_VALUE} for unlimited attempts.
+ */
+ String RECONNECT_MAX_ATTEMPTS_KEY = PREFIX + ".reconnect.max-attempts";
+ int RECONNECT_MAX_ATTEMPTS_DEFAULT = Integer.MAX_VALUE;
+ static int reconnectMaxAttempts(RaftProperties properties) {
+ return getInt(properties::getInt, RECONNECT_MAX_ATTEMPTS_KEY,
RECONNECT_MAX_ATTEMPTS_DEFAULT, getDefaultLog());
+ }
+ static void setReconnectMaxAttempts(RaftProperties properties, int
maxAttempts) {
+ setInt(properties::setInt, RECONNECT_MAX_ATTEMPTS_KEY, maxAttempts);
+ }
Review Comment:
Let's use a single conf so that we may support other RetryPolicy later.
```java
/** A retry policy specified in comma separated format. */
String RECONNECT_POLICY_KEY = PREFIX + ".reconnect.policy";
/** ExponentialBackoffRetry with base sleep 100ms, max sleep 5ms and
max attempt 100. */
String RECONNECT_POLICY_DEFAULT =
"ExponentialBackoffRetry,100ms,5s,100";
static String reconnectPolicy(RaftProperties properties) {
return properties.get(RECONNECT_POLICY_KEY,
RECONNECT_POLICY_DEFAULT);
}
static void setReconnectPolicy(RaftProperties properties, String
retryPolicy) {
properties.set(RECONNECT_POLICY_KEY, retryPolicy);
}
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -136,20 +141,38 @@ void shutdownGracefully() {
}
static class Connection {
- static final TimeDuration RECONNECT = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
-
private final InetSocketAddress address;
private final WorkerGroupGetter workerGroup;
private final Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier;
+ private final long minReconnectMillis;
+ private final long maxReconnectMillis;
+ private final int maxReconnectAttempts;
+ private final RetryPolicy reconnectPolicy;
/** The {@link ChannelFuture} is null when this connection is closed. */
private final AtomicReference<MemoizedSupplier<ChannelFuture>> ref;
+ private final AtomicBoolean reconnectScheduled = new AtomicBoolean(false);
+ private final AtomicInteger reconnectAttempts = new AtomicInteger();
Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
- Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier) {
+ Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier,
+ TimeDuration reconnectDelay, TimeDuration reconnectMaxDelay, int
reconnectMaxAttempts) {
this.address = address;
this.workerGroup = workerGroup;
this.channelInitializerSupplier = channelInitializerSupplier;
+ this.minReconnectMillis =
reconnectDelay.getUnit().toMillis(reconnectDelay.getDuration());
+ this.maxReconnectMillis =
reconnectMaxDelay.getUnit().toMillis(reconnectMaxDelay.getDuration());
+ this.maxReconnectAttempts = reconnectMaxAttempts;
+ Preconditions.assertTrue(minReconnectMillis > 0, () ->
"minReconnectMillis = " + minReconnectMillis + " <= 0");
+ Preconditions.assertTrue(maxReconnectMillis >= minReconnectMillis,
+ () -> "maxReconnectMillis = " + maxReconnectMillis + " <
minReconnectMillis = " + minReconnectMillis);
+ Preconditions.assertTrue(maxReconnectAttempts >= 0,
+ () -> "maxReconnectAttempts = " + maxReconnectAttempts + " < 0");
+ this.reconnectPolicy = ExponentialBackoffRetry.newBuilder()
+ .setBaseSleepTime(reconnectDelay)
+ .setMaxSleepTime(reconnectMaxDelay)
+ .setMaxAttempts(maxReconnectAttempts)
+ .build();
Review Comment:
Let's pass `reconnectPolicy`. Then, we may support other `RetryPolicy` in
the future.
```java
Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier, RetryPolicy reconnectPolicy) {
this.address = address;
this.workerGroup = workerGroup;
this.channelInitializerSupplier = channelInitializerSupplier;
this.reconnectPolicy = reconnectPolicy;
this.ref = new
AtomicReference<>(MemoizedSupplier.valueOf(this::connect));
}
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -191,21 +214,48 @@ public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
scheduleReconnect(Connection.this + " failed", future.cause());
} else {
+ resetReconnectAttempts();
LOG.trace("{} succeed.", Connection.this);
}
}
});
}
+ /**
+ * Schedules a reconnection attempt with exponential backoff and jitter.
+ *
+ * @param message description of the failure
+ * @param cause the exception that triggered reconnection (may be null)
+ */
void scheduleReconnect(String message, Throwable cause) {
if (isClosed()) {
return;
}
- LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message,
address, RECONNECT);
+ if (!reconnectScheduled.compareAndSet(false, true)) {
+ return;
+ }
+ // Use retry index starting at 0 so the first delay equals base sleep
time.
+ final int attempt = reconnectAttempts.getAndIncrement();
+ final RetryPolicy.Action action =
reconnectPolicy.handleAttemptFailure(() -> attempt);
+ if (!action.shouldRetry()) {
+ reconnectScheduled.set(false);
+ LOG.warn("{}: {}; no more retries to {} after attempt {}", this,
message, address, attempt);
+ return;
+ }
+ final long delayMillis = Math.max(1L,
action.getSleepTime().toLong(TimeUnit.MILLISECONDS));
+ final TimeDuration delay = TimeDuration.valueOf(delayMillis,
TimeUnit.MILLISECONDS);
Review Comment:
Let's get the delay directly from the action:
```java
final TimeDuration delay = action.getSleepTime();
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -191,21 +214,48 @@ public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
scheduleReconnect(Connection.this + " failed", future.cause());
} else {
+ resetReconnectAttempts();
Review Comment:
Let's just set it here and remove the method.
```
reconnectAttempts.set(0);
```
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -191,21 +214,48 @@ public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
scheduleReconnect(Connection.this + " failed", future.cause());
} else {
+ resetReconnectAttempts();
LOG.trace("{} succeed.", Connection.this);
}
}
});
}
+ /**
+ * Schedules a reconnection attempt with exponential backoff and jitter.
+ *
+ * @param message description of the failure
+ * @param cause the exception that triggered reconnection (may be null)
+ */
void scheduleReconnect(String message, Throwable cause) {
if (isClosed()) {
return;
}
- LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message,
address, RECONNECT);
+ if (!reconnectScheduled.compareAndSet(false, true)) {
+ return;
+ }
+ // Use retry index starting at 0 so the first delay equals base sleep
time.
+ final int attempt = reconnectAttempts.getAndIncrement();
+ final RetryPolicy.Action action =
reconnectPolicy.handleAttemptFailure(() -> attempt);
+ if (!action.shouldRetry()) {
+ reconnectScheduled.set(false);
+ LOG.warn("{}: {}; no more retries to {} after attempt {}", this,
message, address, attempt);
+ return;
+ }
+ final long delayMillis = Math.max(1L,
action.getSleepTime().toLong(TimeUnit.MILLISECONDS));
+ final TimeDuration delay = TimeDuration.valueOf(delayMillis,
TimeUnit.MILLISECONDS);
+ if (delayMillis <= 500) {
+ LOG.info("{}: {}; schedule reconnecting to {} in {}", this, message,
address, delay);
+ } else {
+ LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message,
address, delay);
+ }
if (cause != null) {
LOG.warn("", cause);
}
Review Comment:
Print also the attempt:
```java
if (cause != null) {
LOG.warn("{}: {}; reconnect to {} in {} for attempt {}",
this, message, address, delay, attempt, cause);
} else if (delay.compareTo(FIVE_HUNDRED_MS) < 0) {
LOG.info("{}: {}; reconnect to {} in {} for attempt {}", this,
message, address, delay, attempt);
} else {
LOG.warn("{}: {}; reconnect to {} in {} for attempt {}", this,
message, address, delay, attempt);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]