This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c650db30a RATIS-2408. Add configurable exponential backoff
reconnection for Netty DataStream client. (#1349)
c650db30a is described below
commit c650db30a3c93b158c06fbf57ff179387b477f39
Author: slfan1989 <[email protected]>
AuthorDate: Thu Feb 19 01:36:48 2026 +0800
RATIS-2408. Add configurable exponential backoff reconnection for Netty
DataStream client. (#1349)
---
.../ratis/retry/ExponentialBackoffRetry.java | 14 +++-
.../java/org/apache/ratis/retry/RetryPolicy.java | 26 ++++++
.../org/apache/ratis/netty/NettyConfigKeys.java | 12 +++
.../ratis/netty/client/NettyClientStreamRpc.java | 72 +++++++++++++++--
.../TestNettyClientStreamRpcReconnectBackoff.java | 79 ++++++++++++++++++
...estNettyDataStreamReconnectWithGrpcCluster.java | 93 ++++++++++++++++++++++
6 files changed, 286 insertions(+), 10 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
b/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
index d506c85c8..3c9ffbf45 100644
---
a/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
+++
b/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.retry;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import java.util.Objects;
@@ -31,7 +32,6 @@ import java.util.concurrent.ThreadLocalRandom;
* in the range [s*0.5, s*1.5).
*/
public final class ExponentialBackoffRetry implements RetryPolicy {
-
public static final class Builder {
private Builder() {}
@@ -56,9 +56,7 @@ public final class ExponentialBackoffRetry implements
RetryPolicy {
}
public ExponentialBackoffRetry build() {
- Objects.requireNonNull(baseSleepTime, "baseSleepTime == null");
- return new ExponentialBackoffRetry(baseSleepTime, maxSleepTime,
- maxAttempts);
+ return new ExponentialBackoffRetry(baseSleepTime, maxSleepTime,
maxAttempts);
}
}
@@ -67,6 +65,14 @@ public final class ExponentialBackoffRetry implements
RetryPolicy {
private final int maxAttempts;
private ExponentialBackoffRetry(TimeDuration baseSleepTime, TimeDuration
maxSleepTime, int maxAttempts) {
+ Objects.requireNonNull(baseSleepTime, "baseSleepTime == null");
+ Preconditions.assertTrue(baseSleepTime.isPositive(), () -> "baseSleepTime
= " + baseSleepTime + " <= 0");
+ if (maxSleepTime != null) {
+ Preconditions.assertTrue(maxSleepTime.compareTo(baseSleepTime) >= 0,
+ () -> "maxSleepTime = " + maxSleepTime + " < baseSleepTime = " +
baseSleepTime);
+ }
+ Preconditions.assertTrue(maxAttempts >= 0, () -> "maxAttempts = " +
maxAttempts + " < 0");
+
this.baseSleepTime = baseSleepTime;
this.maxSleepTime = maxSleepTime;
this.maxAttempts = maxAttempts;
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index 1de07f19e..6916858e5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -19,6 +19,9 @@ package org.apache.ratis.retry;
import org.apache.ratis.util.TimeDuration;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
/**
* Policy abstract for retrying.
*/
@@ -72,4 +75,27 @@ public interface RetryPolicy {
* @return the action it should take.
*/
Action handleAttemptFailure(Event event);
+
+ static RetryPolicy parse(String commaSeparated) {
+ Objects.requireNonNull(commaSeparated, "commaSeparated == null");
+ final String[] args = commaSeparated.split(",");
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Failed to parse RetryPolicy:
args.length = "
+ + args.length + " < 1 for " + commaSeparated);
+ }
+ final String classname = args[0].trim();
+ if (classname.equals(ExponentialBackoffRetry.class.getSimpleName())) {
+ if (args.length != 4) {
+ throw new IllegalArgumentException("Failed to parse
ExponentialBackoffRetry: args.length = "
+ + args.length + " != 4 for " + commaSeparated);
+ }
+ return ExponentialBackoffRetry.newBuilder()
+ .setBaseSleepTime(TimeDuration.valueOf(args[1],
TimeUnit.MILLISECONDS))
+ .setMaxSleepTime(TimeDuration.valueOf(args[2],
TimeUnit.MILLISECONDS))
+ .setMaxAttempts(Integer.parseInt(args[3].trim()))
+ .build();
+ }
+ throw new IllegalArgumentException("Failed to parse RetryPolicy: unknown
class "
+ + args[0] + " for " + commaSeparated);
+ }
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index be3ad8ee6..e84cb4eb2 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -176,6 +176,18 @@ public interface NettyConfigKeys {
static void setReplyQueueGracePeriod(RaftProperties properties,
TimeDuration timeoutDuration) {
setTimeDuration(properties::setTimeDuration,
REPLY_QUEUE_GRACE_PERIOD_KEY, timeoutDuration);
}
+
+ /** A retry policy specified in comma separated format. */
+ String RECONNECT_POLICY_KEY = PREFIX + ".reconnect.policy";
+ /** ExponentialBackoffRetry with base sleep 100ms, max sleep 5s and max
attempt 100. */
+ String RECONNECT_POLICY_DEFAULT = "ExponentialBackoffRetry,100ms,5s,100";
+ static String reconnectPolicy(RaftProperties properties) {
+ return properties.get(RECONNECT_POLICY_KEY, RECONNECT_POLICY_DEFAULT);
+ }
+ static void setReconnectPolicy(RaftProperties properties, String
retryPolicy) {
+ properties.set(RECONNECT_POLICY_KEY, retryPolicy);
+ }
+
}
interface Server {
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 5e111daff..44e91d283 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -35,7 +35,9 @@ import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.security.TlsConf;
+import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
@@ -71,6 +73,8 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -136,20 +140,24 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
static class Connection {
- static final TimeDuration RECONNECT = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
+ static final TimeDuration FIVE_HUNDRED_MS = TimeDuration.valueOf(500,
TimeUnit.MILLISECONDS);
private final InetSocketAddress address;
private final WorkerGroupGetter workerGroup;
private final Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier;
+ private final RetryPolicy reconnectPolicy;
/** The {@link ChannelFuture} is null when this connection is closed. */
private final AtomicReference<MemoizedSupplier<ChannelFuture>> ref;
+ private final AtomicBoolean reconnectScheduled = new AtomicBoolean(false);
+ private final AtomicInteger reconnectAttempts = new AtomicInteger();
Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
- Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier) {
+ Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier, RetryPolicy reconnectPolicy) {
this.address = address;
this.workerGroup = workerGroup;
this.channelInitializerSupplier = channelInitializerSupplier;
+ this.reconnectPolicy = reconnectPolicy;
this.ref = new
AtomicReference<>(MemoizedSupplier.valueOf(this::connect));
}
@@ -191,21 +199,47 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
if (!future.isSuccess()) {
scheduleReconnect(Connection.this + " failed", future.cause());
} else {
+ reconnectAttempts.set(0);
LOG.trace("{} succeed.", Connection.this);
}
}
});
}
+ /**
+ * Schedules a reconnection attempt with exponential backoff and jitter.
+ *
+ * @param message description of the failure
+ * @param cause the exception that triggered reconnection (may be null)
+ */
void scheduleReconnect(String message, Throwable cause) {
if (isClosed()) {
return;
}
- LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message,
address, RECONNECT);
+ if (!reconnectScheduled.compareAndSet(false, true)) {
+ return;
+ }
+ // Use retry index starting at 0 so the first delay equals base sleep
time.
+ final int attempt = reconnectAttempts.getAndIncrement();
+ final RetryPolicy.Action action =
reconnectPolicy.handleAttemptFailure(() -> attempt);
+ if (!action.shouldRetry()) {
+ reconnectScheduled.set(false);
+ LOG.warn("{}: {}; no more retries to {} after attempt {}", this,
message, address, attempt);
+ return;
+ }
+ final TimeDuration delay = action.getSleepTime();
if (cause != null) {
- LOG.warn("", cause);
+ LOG.warn("{}: {}; reconnect to {} in {} for attempt {}",
+ this, message, address, delay, attempt, cause);
+ } else if (delay.compareTo(FIVE_HUNDRED_MS) < 0) {
+ LOG.info("{}: {}; reconnect to {} in {} for attempt {}", this,
message, address, delay, attempt);
+ } else {
+ LOG.warn("{}: {}; reconnect to {} in {} for attempt {}", this,
message, address, delay, attempt);
}
- getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(),
RECONNECT.getUnit());
+ getWorkerGroup().schedule(() -> {
+ reconnectScheduled.set(false);
+ reconnect();
+ }, delay.getDuration(), delay.getUnit());
}
private synchronized ChannelFuture reconnect() {
@@ -313,8 +347,10 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
final InetSocketAddress address =
NetUtils.createSocketAddr(server.getDataStreamAddress());
final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
+ final RetryPolicy reconnectPolicy =
+
RetryPolicy.parse(NettyConfigKeys.DataStream.Client.reconnectPolicy(properties));
this.connection = new Connection(address,
WorkerGroupGetter.newInstance(properties),
- () -> newChannelInitializer(address, sslContext, getClientHandler()));
+ () -> newChannelInitializer(address, sslContext, getClientHandler()),
reconnectPolicy);
}
private ChannelInboundHandler getClientHandler(){
@@ -492,4 +528,28 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public String toString() {
return name;
}
+
+ // Visible for tests.
+ @VisibleForTesting
+ RetryPolicy getReconnectPolicy() {
+ return connection.reconnectPolicy;
+ }
+
+ // Visible for tests.
+ boolean waitForChannelActive(TimeDuration timeout) {
+ final long deadline = System.nanoTime() +
timeout.toLong(TimeUnit.NANOSECONDS);
+ while (System.nanoTime() < deadline) {
+ final Channel channel = connection.getChannelUninterruptibly();
+ if (channel != null && channel.isActive()) {
+ return true;
+ }
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ return false;
+ }
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyClientStreamRpcReconnectBackoff.java
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyClientStreamRpcReconnectBackoff.java
new file mode 100644
index 000000000..304c488fc
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyClientStreamRpcReconnectBackoff.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.ExponentialBackoffRetry;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestNettyClientStreamRpcReconnectBackoff {
+ @Test
+ public void testReconnectPolicyBackoffRanges() throws Exception {
+ // Use a small base/max to keep the test fast and deterministic in range
checks.
+ final RaftProperties properties = new RaftProperties();
+ final TimeDuration base = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ final TimeDuration max = TimeDuration.valueOf(400, TimeUnit.MILLISECONDS);
+ final int maxAttempts = 5;
+ NettyConfigKeys.DataStream.Client.setReconnectPolicy(properties,
+ "ExponentialBackoffRetry," + base + "," + max + "," + maxAttempts);
+
+ final RaftPeer peer = RaftPeer.newBuilder()
+ .setId("s1")
+ .setDataStreamAddress(new InetSocketAddress("127.0.0.1", 1))
+ .build();
+
+ final NettyClientStreamRpc rpc = new NettyClientStreamRpc(peer, null,
properties);
+ try {
+ // Verify the reconnect policy is exponential and uses the configured
maxAttempts.
+ final RetryPolicy policy = rpc.getReconnectPolicy();
+ assertTrue(policy instanceof ExponentialBackoffRetry);
+ assertFalse(policy.handleAttemptFailure(() ->
maxAttempts).shouldRetry());
+
+ // attempt=0 -> base delay; attempt=1 -> 2x base; attempt=3 -> capped by
max.
+ assertSleepInRange(policy, 0, base, max);
+ assertSleepInRange(policy, 1, base, max);
+ // Attempt 3 should be capped by max sleep time.
+ assertSleepInRange(policy, 3, base, max);
+ } finally {
+ rpc.close();
+ }
+ }
+
+ private static void assertSleepInRange(RetryPolicy policy, int attempt,
TimeDuration base, TimeDuration max) {
+ final RetryPolicy.Action action = policy.handleAttemptFailure(() ->
attempt);
+ assertTrue(action.shouldRetry());
+
+ final long baseMillis = base.toLong(TimeUnit.MILLISECONDS);
+ final long maxMillis = max.toLong(TimeUnit.MILLISECONDS);
+ final long expected = Math.min(maxMillis, baseMillis * (1L << attempt));
+ final long actual = action.getSleepTime().toLong(TimeUnit.MILLISECONDS);
+
+ assertTrue(actual >= expected / 2, "delay too small: " + actual);
+ assertTrue(actual <= expected + expected / 2, "delay too large: " +
actual);
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyDataStreamReconnectWithGrpcCluster.java
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyDataStreamReconnectWithGrpcCluster.java
new file mode 100644
index 000000000..2be0bc260
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyDataStreamReconnectWithGrpcCluster.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.DataStreamClient;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import
org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
+import
org.apache.ratis.datastream.MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.ExponentialBackoffRetry;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 120)
+public class TestNettyDataStreamReconnectWithGrpcCluster extends BaseTest
+ implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet
{
+ {
+ setStateMachine(MultiDataStreamStateMachine.class);
+ }
+
+ @Test
+ public void testReconnectConfigApplied() throws Exception {
+ final RaftProperties properties = getProperties();
+ final TimeDuration reconnectDelay = TimeDuration.valueOf(200,
TimeUnit.MILLISECONDS);
+ final TimeDuration reconnectMaxDelay = TimeDuration.valueOf(400,
TimeUnit.MILLISECONDS);
+ NettyConfigKeys.DataStream.Client.setReconnectPolicy(properties,
+ "ExponentialBackoffRetry," + reconnectDelay + "," + reconnectMaxDelay
+ ",10");
+
+ runWithNewCluster(1, cluster -> {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeer primary = cluster.getLeader().getPeer();
+
+ final RaftClient client = cluster.createClient(primary);
+ try {
+ final DataStreamClient dataStreamClient = (DataStreamClient)
client.getDataStreamApi();
+ final NettyClientStreamRpc rpc = (NettyClientStreamRpc)
dataStreamClient.getClientRpc();
+
+ // Verify reconnect configuration is applied.
+ final RetryPolicy policy = rpc.getReconnectPolicy();
+ assertTrue(policy instanceof ExponentialBackoffRetry);
+ assertSleepInRange(policy, 0, reconnectDelay, reconnectMaxDelay);
+ assertSleepInRange(policy, 1, reconnectDelay, reconnectMaxDelay);
+
+ // Verify the data stream channel can be established.
+ assertTrue(rpc.waitForChannelActive(TimeDuration.valueOf(5,
TimeUnit.SECONDS)),
+ "Data stream channel should be active");
+ } finally {
+ IOUtils.cleanup(LOG, client);
+ }
+ });
+ }
+
+ private static void assertSleepInRange(RetryPolicy policy, int attempt,
TimeDuration base, TimeDuration max) {
+ final RetryPolicy.Action action = policy.handleAttemptFailure(() ->
attempt);
+ assertTrue(action.shouldRetry());
+
+ final long baseMillis = base.toLong(TimeUnit.MILLISECONDS);
+ final long maxMillis = max.toLong(TimeUnit.MILLISECONDS);
+ final long expected = Math.min(maxMillis, baseMillis * (1L << attempt));
+ final long actual = action.getSleepTime().toLong(TimeUnit.MILLISECONDS);
+
+ assertTrue(actual >= expected / 2, "delay too small: " + actual);
+ assertTrue(actual <= expected + expected / 2, "delay too large: " +
actual);
+ }
+
+}