This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c650db30a RATIS-2408. Add configurable exponential backoff 
reconnection for Netty DataStream client. (#1349)
c650db30a is described below

commit c650db30a3c93b158c06fbf57ff179387b477f39
Author: slfan1989 <[email protected]>
AuthorDate: Thu Feb 19 01:36:48 2026 +0800

    RATIS-2408. Add configurable exponential backoff reconnection for Netty 
DataStream client. (#1349)
---
 .../ratis/retry/ExponentialBackoffRetry.java       | 14 +++-
 .../java/org/apache/ratis/retry/RetryPolicy.java   | 26 ++++++
 .../org/apache/ratis/netty/NettyConfigKeys.java    | 12 +++
 .../ratis/netty/client/NettyClientStreamRpc.java   | 72 +++++++++++++++--
 .../TestNettyClientStreamRpcReconnectBackoff.java  | 79 ++++++++++++++++++
 ...estNettyDataStreamReconnectWithGrpcCluster.java | 93 ++++++++++++++++++++++
 6 files changed, 286 insertions(+), 10 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
 
b/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
index d506c85c8..3c9ffbf45 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/retry/ExponentialBackoffRetry.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.retry;
 
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.Objects;
@@ -31,7 +32,6 @@ import java.util.concurrent.ThreadLocalRandom;
  * in the range [s*0.5, s*1.5).
  */
 public final class ExponentialBackoffRetry implements RetryPolicy {
-
   public static final class Builder {
 
     private Builder() {}
@@ -56,9 +56,7 @@ public final class ExponentialBackoffRetry implements 
RetryPolicy {
     }
 
     public ExponentialBackoffRetry build() {
-      Objects.requireNonNull(baseSleepTime, "baseSleepTime == null");
-      return new ExponentialBackoffRetry(baseSleepTime, maxSleepTime,
-          maxAttempts);
+      return new ExponentialBackoffRetry(baseSleepTime, maxSleepTime, 
maxAttempts);
     }
   }
 
@@ -67,6 +65,14 @@ public final class ExponentialBackoffRetry implements 
RetryPolicy {
   private final int maxAttempts;
 
   private ExponentialBackoffRetry(TimeDuration baseSleepTime, TimeDuration 
maxSleepTime, int maxAttempts) {
+    Objects.requireNonNull(baseSleepTime, "baseSleepTime == null");
+    Preconditions.assertTrue(baseSleepTime.isPositive(), () -> "baseSleepTime 
= " + baseSleepTime + " <= 0");
+    if (maxSleepTime != null) {
+      Preconditions.assertTrue(maxSleepTime.compareTo(baseSleepTime) >= 0,
+          () -> "maxSleepTime = " + maxSleepTime + " < baseSleepTime = " + 
baseSleepTime);
+    }
+    Preconditions.assertTrue(maxAttempts >= 0, () -> "maxAttempts = " + 
maxAttempts + " < 0");
+
     this.baseSleepTime = baseSleepTime;
     this.maxSleepTime = maxSleepTime;
     this.maxAttempts = maxAttempts;
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index 1de07f19e..6916858e5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -19,6 +19,9 @@ package org.apache.ratis.retry;
 
 import org.apache.ratis.util.TimeDuration;
 
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Policy abstract for retrying.
  */
@@ -72,4 +75,27 @@ public interface RetryPolicy {
    * @return the action it should take.
    */
   Action handleAttemptFailure(Event event);
+
+  static RetryPolicy parse(String commaSeparated) {
+    Objects.requireNonNull(commaSeparated, "commaSeparated == null");
+    final String[] args = commaSeparated.split(",");
+    if (args.length < 1) {
+      throw new IllegalArgumentException("Failed to parse RetryPolicy: 
args.length = "
+          + args.length + " < 1 for " + commaSeparated);
+    }
+    final String classname = args[0].trim();
+    if (classname.equals(ExponentialBackoffRetry.class.getSimpleName())) {
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Failed to parse 
ExponentialBackoffRetry: args.length = "
+            + args.length + " != 4 for " + commaSeparated);
+      }
+      return ExponentialBackoffRetry.newBuilder()
+          .setBaseSleepTime(TimeDuration.valueOf(args[1], 
TimeUnit.MILLISECONDS))
+          .setMaxSleepTime(TimeDuration.valueOf(args[2], 
TimeUnit.MILLISECONDS))
+          .setMaxAttempts(Integer.parseInt(args[3].trim()))
+          .build();
+    }
+    throw new IllegalArgumentException("Failed to parse RetryPolicy: unknown 
class "
+        + args[0] + " for " + commaSeparated);
+  }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index be3ad8ee6..e84cb4eb2 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -176,6 +176,18 @@ public interface NettyConfigKeys {
       static void setReplyQueueGracePeriod(RaftProperties properties, 
TimeDuration timeoutDuration) {
         setTimeDuration(properties::setTimeDuration, 
REPLY_QUEUE_GRACE_PERIOD_KEY, timeoutDuration);
       }
+
+      /** A retry policy specified in comma separated format. */
+      String RECONNECT_POLICY_KEY = PREFIX + ".reconnect.policy";
+      /** ExponentialBackoffRetry with base sleep 100ms, max sleep 5s and max 
attempt 100. */
+      String RECONNECT_POLICY_DEFAULT = "ExponentialBackoffRetry,100ms,5s,100";
+      static String reconnectPolicy(RaftProperties properties) {
+        return properties.get(RECONNECT_POLICY_KEY, RECONNECT_POLICY_DEFAULT);
+      }
+      static void setReconnectPolicy(RaftProperties properties, String 
retryPolicy) {
+        properties.set(RECONNECT_POLICY_KEY, retryPolicy);
+      }
+
     }
 
     interface Server {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 5e111daff..44e91d283 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -35,7 +35,9 @@ import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.security.TlsConf;
+import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.Channel;
@@ -71,6 +73,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -136,20 +140,24 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   }
 
   static class Connection {
-    static final TimeDuration RECONNECT = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
+    static final TimeDuration FIVE_HUNDRED_MS = TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS);
 
     private final InetSocketAddress address;
     private final WorkerGroupGetter workerGroup;
     private final Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier;
+    private final RetryPolicy reconnectPolicy;
 
     /** The {@link ChannelFuture} is null when this connection is closed. */
     private final AtomicReference<MemoizedSupplier<ChannelFuture>> ref;
+    private final AtomicBoolean reconnectScheduled = new AtomicBoolean(false);
+    private final AtomicInteger reconnectAttempts = new AtomicInteger();
 
     Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
-        Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier) {
+        Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier, RetryPolicy reconnectPolicy) {
       this.address = address;
       this.workerGroup = workerGroup;
       this.channelInitializerSupplier = channelInitializerSupplier;
+      this.reconnectPolicy = reconnectPolicy;
       this.ref = new 
AtomicReference<>(MemoizedSupplier.valueOf(this::connect));
     }
 
@@ -191,21 +199,47 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
               if (!future.isSuccess()) {
                 scheduleReconnect(Connection.this + " failed", future.cause());
               } else {
+                reconnectAttempts.set(0);
                 LOG.trace("{} succeed.", Connection.this);
               }
             }
           });
     }
 
+    /**
+     * Schedules a reconnection attempt with exponential backoff and jitter.
+     *
+     * @param message description of the failure
+     * @param cause the exception that triggered reconnection (may be null)
+     */
     void scheduleReconnect(String message, Throwable cause) {
       if (isClosed()) {
         return;
       }
-      LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message, 
address, RECONNECT);
+      if (!reconnectScheduled.compareAndSet(false, true)) {
+        return;
+      }
+      // Use retry index starting at 0 so the first delay equals base sleep 
time.
+      final int attempt = reconnectAttempts.getAndIncrement();
+      final RetryPolicy.Action action = 
reconnectPolicy.handleAttemptFailure(() -> attempt);
+      if (!action.shouldRetry()) {
+        reconnectScheduled.set(false);
+        LOG.warn("{}: {}; no more retries to {} after attempt {}", this, 
message, address, attempt);
+        return;
+      }
+      final TimeDuration delay = action.getSleepTime();
       if (cause != null) {
-        LOG.warn("", cause);
+        LOG.warn("{}: {}; reconnect to {} in {} for attempt {}",
+            this, message, address, delay, attempt, cause);
+      } else if (delay.compareTo(FIVE_HUNDRED_MS) < 0) {
+        LOG.info("{}: {}; reconnect to {} in {} for attempt {}", this, 
message, address, delay, attempt);
+      } else {
+        LOG.warn("{}: {}; reconnect to {} in {} for attempt {}", this, 
message, address, delay, attempt);
       }
-      getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), 
RECONNECT.getUnit());
+      getWorkerGroup().schedule(() -> {
+        reconnectScheduled.set(false);
+        reconnect();
+      }, delay.getDuration(), delay.getUnit());
     }
 
     private synchronized ChannelFuture reconnect() {
@@ -313,8 +347,10 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
     final InetSocketAddress address = 
NetUtils.createSocketAddr(server.getDataStreamAddress());
     final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
+    final RetryPolicy reconnectPolicy =
+        
RetryPolicy.parse(NettyConfigKeys.DataStream.Client.reconnectPolicy(properties));
     this.connection = new Connection(address, 
WorkerGroupGetter.newInstance(properties),
-        () -> newChannelInitializer(address, sslContext, getClientHandler()));
+        () -> newChannelInitializer(address, sslContext, getClientHandler()), 
reconnectPolicy);
   }
 
   private ChannelInboundHandler getClientHandler(){
@@ -492,4 +528,28 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   public String toString() {
     return name;
   }
+
+  // Visible for tests.
+  @VisibleForTesting
+  RetryPolicy getReconnectPolicy() {
+    return connection.reconnectPolicy;
+  }
+
+  // Visible for tests.
+  boolean waitForChannelActive(TimeDuration timeout) {
+    final long deadline = System.nanoTime() + 
timeout.toLong(TimeUnit.NANOSECONDS);
+    while (System.nanoTime() < deadline) {
+      final Channel channel = connection.getChannelUninterruptibly();
+      if (channel != null && channel.isActive()) {
+        return true;
+      }
+      try {
+        Thread.sleep(100L);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyClientStreamRpcReconnectBackoff.java
 
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyClientStreamRpcReconnectBackoff.java
new file mode 100644
index 000000000..304c488fc
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyClientStreamRpcReconnectBackoff.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.ExponentialBackoffRetry;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestNettyClientStreamRpcReconnectBackoff {
+  @Test
+  public void testReconnectPolicyBackoffRanges() throws Exception {
+    // Use a small base/max to keep the test fast and deterministic in range 
checks.
+    final RaftProperties properties = new RaftProperties();
+    final TimeDuration base = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+    final TimeDuration max = TimeDuration.valueOf(400, TimeUnit.MILLISECONDS);
+    final int maxAttempts = 5;
+    NettyConfigKeys.DataStream.Client.setReconnectPolicy(properties,
+        "ExponentialBackoffRetry," + base + "," + max + "," + maxAttempts);
+
+    final RaftPeer peer = RaftPeer.newBuilder()
+        .setId("s1")
+        .setDataStreamAddress(new InetSocketAddress("127.0.0.1", 1))
+        .build();
+
+    final NettyClientStreamRpc rpc = new NettyClientStreamRpc(peer, null, 
properties);
+    try {
+      // Verify the reconnect policy is exponential and uses the configured 
maxAttempts.
+      final RetryPolicy policy = rpc.getReconnectPolicy();
+      assertTrue(policy instanceof ExponentialBackoffRetry);
+      assertFalse(policy.handleAttemptFailure(() -> 
maxAttempts).shouldRetry());
+
+      // attempt=0 -> base delay; attempt=1 -> 2x base; attempt=3 -> capped by 
max.
+      assertSleepInRange(policy, 0, base, max);
+      assertSleepInRange(policy, 1, base, max);
+      // Attempt 3 should be capped by max sleep time.
+      assertSleepInRange(policy, 3, base, max);
+    } finally {
+      rpc.close();
+    }
+  }
+
+  private static void assertSleepInRange(RetryPolicy policy, int attempt, 
TimeDuration base, TimeDuration max) {
+    final RetryPolicy.Action action = policy.handleAttemptFailure(() -> 
attempt);
+    assertTrue(action.shouldRetry());
+
+    final long baseMillis = base.toLong(TimeUnit.MILLISECONDS);
+    final long maxMillis = max.toLong(TimeUnit.MILLISECONDS);
+    final long expected = Math.min(maxMillis, baseMillis * (1L << attempt));
+    final long actual = action.getSleepTime().toLong(TimeUnit.MILLISECONDS);
+
+    assertTrue(actual >= expected / 2, "delay too small: " + actual);
+    assertTrue(actual <= expected + expected / 2, "delay too large: " + 
actual);
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyDataStreamReconnectWithGrpcCluster.java
 
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyDataStreamReconnectWithGrpcCluster.java
new file mode 100644
index 000000000..2be0bc260
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/netty/client/TestNettyDataStreamReconnectWithGrpcCluster.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.DataStreamClient;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import 
org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
+import 
org.apache.ratis.datastream.MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.ExponentialBackoffRetry;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 120)
+public class TestNettyDataStreamReconnectWithGrpcCluster extends BaseTest
+    implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet 
{
+  {
+    setStateMachine(MultiDataStreamStateMachine.class);
+  }
+
+  @Test
+  public void testReconnectConfigApplied() throws Exception {
+    final RaftProperties properties = getProperties();
+    final TimeDuration reconnectDelay = TimeDuration.valueOf(200, 
TimeUnit.MILLISECONDS);
+    final TimeDuration reconnectMaxDelay = TimeDuration.valueOf(400, 
TimeUnit.MILLISECONDS);
+    NettyConfigKeys.DataStream.Client.setReconnectPolicy(properties,
+        "ExponentialBackoffRetry," + reconnectDelay + "," + reconnectMaxDelay 
+ ",10");
+
+    runWithNewCluster(1, cluster -> {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeer primary = cluster.getLeader().getPeer();
+
+      final RaftClient client = cluster.createClient(primary);
+      try {
+        final DataStreamClient dataStreamClient = (DataStreamClient) 
client.getDataStreamApi();
+        final NettyClientStreamRpc rpc = (NettyClientStreamRpc) 
dataStreamClient.getClientRpc();
+
+        // Verify reconnect configuration is applied.
+        final RetryPolicy policy = rpc.getReconnectPolicy();
+        assertTrue(policy instanceof ExponentialBackoffRetry);
+        assertSleepInRange(policy, 0, reconnectDelay, reconnectMaxDelay);
+        assertSleepInRange(policy, 1, reconnectDelay, reconnectMaxDelay);
+
+        // Verify the data stream channel can be established.
+        assertTrue(rpc.waitForChannelActive(TimeDuration.valueOf(5, 
TimeUnit.SECONDS)),
+            "Data stream channel should be active");
+      } finally {
+        IOUtils.cleanup(LOG, client);
+      }
+    });
+  }
+
+  private static void assertSleepInRange(RetryPolicy policy, int attempt, 
TimeDuration base, TimeDuration max) {
+    final RetryPolicy.Action action = policy.handleAttemptFailure(() -> 
attempt);
+    assertTrue(action.shouldRetry());
+
+    final long baseMillis = base.toLong(TimeUnit.MILLISECONDS);
+    final long maxMillis = max.toLong(TimeUnit.MILLISECONDS);
+    final long expected = Math.min(maxMillis, baseMillis * (1L << attempt));
+    final long actual = action.getSleepTime().toLong(TimeUnit.MILLISECONDS);
+
+    assertTrue(actual >= expected / 2, "delay too small: " + actual);
+    assertTrue(actual <= expected + expected / 2, "delay too large: " + 
actual);
+  }
+
+}

Reply via email to