This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 0942c4aa8 feat(java-client): Introduce exponentialBackoff retry 
mechanism (#1822)
0942c4aa8 is described below

commit 0942c4aa8283ab336c98fe09e8851d6950d20745
Author: Samunroyu <[email protected]>
AuthorDate: Thu Mar 7 15:40:09 2024 +0800

    feat(java-client): Introduce exponentialBackoff retry mechanism (#1822)
    
    Introduces a retry mechanism into the Java client to enhance user
    convenience and robustness.
    
    It can be enabled by calling `retryPolicy()`, `retryBaseInterval()`
    and `retryMaxTimes()` in when construct ClientOptions objects.
---
 .../org/apache/pegasus/client/ClientOptions.java   | 79 ++++++++++++++++++++--
 .../pegasus/client/retry/DefaultRetryPolicy.java   | 48 +++++++++++++
 .../retry/ExponentialBackoffRetryPolicy.java       | 66 ++++++++++++++++++
 .../apache/pegasus/client/retry/RetryPolicies.java | 34 ++++++++++
 .../apache/pegasus/client/retry/RetryPolicy.java   | 57 ++++++++++++++++
 .../pegasus/rpc/async/ClientRequestRound.java      |  4 +-
 .../apache/pegasus/rpc/async/ClusterManager.java   | 23 +++++--
 .../apache/pegasus/rpc/async/ReplicaSession.java   |  2 +-
 .../org/apache/pegasus/rpc/async/TableHandler.java | 33 +++++++--
 .../client/retry/TestDefaultRetryPolicy.java       | 49 ++++++++++++++
 .../retry/TestExponentialBackoffRetryPolicy.java   | 66 ++++++++++++++++++
 11 files changed, 442 insertions(+), 19 deletions(-)

diff --git 
a/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java 
b/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java
index b4ab3eb02..f9e07276e 100644
--- a/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java
+++ b/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java
@@ -19,6 +19,7 @@ import static 
org.apache.pegasus.client.PConfigUtil.loadConfiguration;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.Properties;
+import org.apache.pegasus.client.retry.RetryPolicies;
 import org.apache.pegasus.security.Credential;
 import org.apache.pegasus.tools.WriteLimiter;
 import org.apache.pegasus.util.PropertyUtils;
@@ -56,7 +57,9 @@ public class ClientOptions {
   public static final String PEGASUS_AUTH_PROTOCOL_KEY = "auth_protocol";
   public static final String PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY =
       "session_reset_time_window_secs";
-
+  public static final String PEGASUS_RETRY_POLICY_KEY = "retry_policy";
+  public static final String PEGASUS_RETRY_BASE_INTERVAL_MS = 
"retry_base_interval_ms";
+  public static final String PEGASUS_RETRY_MAX_TIMES = "retry_max_times";
   public static final String DEFAULT_META_SERVERS =
       "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
   public static final Duration DEFAULT_OPERATION_TIMEOUT = 
Duration.ofMillis(1000);
@@ -68,7 +71,9 @@ public class ClientOptions {
   public static final Duration DEFAULT_META_QUERY_TIMEOUT = 
Duration.ofMillis(5000);
   public static final String DEFAULT_AUTH_PROTOCOL = "";
   public static final long DEFAULT_SESSION_RESET_SECS_WINDOW = 30;
-
+  public static final String DEFAULT_RETRY_POLICY = 
RetryPolicies.DEFAULT.name().toLowerCase();
+  public static final long DEFAULT_RETRY_BASE_INTERVAL_MS = 20;
+  public static final int DEFAULT_RETRY_MAX_TIMES = Integer.MAX_VALUE;
   private final String metaServers;
   private final Duration operationTimeout;
   private final int asyncWorkers;
@@ -79,6 +84,9 @@ public class ClientOptions {
   private final Duration metaQueryTimeout;
   private final Credential credential;
   private final long sessionResetTimeWindowSecs;
+  private final RetryPolicies retryPolicy;
+  private final Duration retryBaseInterval;
+  private final int retryMaxTimes;
 
   protected ClientOptions(Builder builder) {
     this.metaServers = builder.metaServers;
@@ -91,6 +99,9 @@ public class ClientOptions {
     this.metaQueryTimeout = builder.metaQueryTimeout;
     this.credential = builder.credential;
     this.sessionResetTimeWindowSecs = builder.sessionResetTimeWindowSecs;
+    this.retryPolicy = builder.retryPolicy;
+    this.retryBaseInterval = builder.retryBaseInterval;
+    this.retryMaxTimes = builder.retryMaxTimes;
   }
 
   protected ClientOptions(ClientOptions original) {
@@ -104,6 +115,9 @@ public class ClientOptions {
     this.metaQueryTimeout = original.getMetaQueryTimeout();
     this.credential = original.getCredential();
     this.sessionResetTimeWindowSecs = original.getSessionResetTimeWindowSecs();
+    this.retryPolicy = original.getRetryPolicy();
+    this.retryBaseInterval = original.getRetryBaseInterval();
+    this.retryMaxTimes = original.getRetryMaxTimes();
   }
 
   /**
@@ -176,6 +190,15 @@ public class ClientOptions {
     long sessionResetTimeWindowSecs =
         PropertyUtils.getLong(
             config, PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY, 
DEFAULT_SESSION_RESET_SECS_WINDOW);
+    RetryPolicies retryPolicy =
+        RetryPolicies.valueOf(
+            config.getProperty(PEGASUS_RETRY_POLICY_KEY, 
DEFAULT_RETRY_POLICY.toUpperCase()));
+    Duration retryBaseInterval =
+        Duration.ofMillis(
+            PropertyUtils.getLong(
+                config, PEGASUS_RETRY_BASE_INTERVAL_MS, 
DEFAULT_RETRY_BASE_INTERVAL_MS));
+    int retryMaxTimes =
+        PropertyUtils.getInt(config, PEGASUS_RETRY_MAX_TIMES, 
DEFAULT_RETRY_MAX_TIMES);
 
     return ClientOptions.builder()
         .metaServers(metaList)
@@ -187,6 +210,9 @@ public class ClientOptions {
         .metaQueryTimeout(metaQueryTimeout)
         .credential(credential)
         .sessionResetTimeWindowSecs(sessionResetTimeWindowSecs)
+        .retryPolicy(retryPolicy)
+        .retryBaseInterval(retryBaseInterval)
+        .retryMaxTimes(retryMaxTimes)
         .build();
   }
 
@@ -206,7 +232,10 @@ public class ClientOptions {
           && this.enableWriteLimit == clientOptions.enableWriteLimit
           && this.metaQueryTimeout.toMillis() == 
clientOptions.metaQueryTimeout.toMillis()
           && this.credential == clientOptions.credential
-          && this.sessionResetTimeWindowSecs == 
clientOptions.sessionResetTimeWindowSecs;
+          && this.sessionResetTimeWindowSecs == 
clientOptions.sessionResetTimeWindowSecs
+          && this.retryPolicy == clientOptions.retryPolicy
+          && this.retryBaseInterval.equals(clientOptions.retryBaseInterval)
+          && this.retryMaxTimes == clientOptions.retryMaxTimes;
     }
     return false;
   }
@@ -250,7 +279,13 @@ public class ClientOptions {
             + ", metaQueryTimeout(ms)="
             + metaQueryTimeout.toMillis()
             + ", sessionResetTimeWindowSecs="
-            + sessionResetTimeWindowSecs;
+            + sessionResetTimeWindowSecs
+            + ", retryPolicy="
+            + retryPolicy
+            + ", retryBaseInterval="
+            + retryBaseInterval
+            + ", retryMaxTimes="
+            + retryMaxTimes;
     if (credential != null) {
       res += ", credential=" + credential.toString();
     }
@@ -269,6 +304,9 @@ public class ClientOptions {
     private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
     private Credential credential = null;
     private long sessionResetTimeWindowSecs = 
DEFAULT_SESSION_RESET_SECS_WINDOW;
+    private RetryPolicies retryPolicy = 
RetryPolicies.valueOf(DEFAULT_RETRY_POLICY.toUpperCase());
+    private Duration retryBaseInterval = 
Duration.ofMillis(DEFAULT_RETRY_BASE_INTERVAL_MS);
+    private int retryMaxTimes = DEFAULT_RETRY_MAX_TIMES;
 
     protected Builder() {}
 
@@ -400,6 +438,22 @@ public class ClientOptions {
       return this;
     }
 
+    public Builder retryPolicy(RetryPolicies retryPolicy) {
+      this.retryPolicy = retryPolicy;
+      return this;
+    }
+
+    public Builder retryBaseInterval(Duration retryBaseInterval) {
+      this.retryBaseInterval = retryBaseInterval;
+      return this;
+    }
+
+    public Builder retryMaxTimes(int retryMaxTimes) {
+      assert retryMaxTimes >= 0 : String.format("must pass non negative value: 
%d", retryMaxTimes);
+      this.retryMaxTimes = retryMaxTimes;
+      return this;
+    }
+
     /**
      * Create a new instance of {@link ClientOptions}.
      *
@@ -432,7 +486,10 @@ public class ClientOptions {
         .falconPushInterval(getFalconPushInterval())
         .enableWriteLimit(isWriteLimitEnabled())
         .metaQueryTimeout(getMetaQueryTimeout())
-        .credential(getCredential());
+        .credential(getCredential())
+        .retryPolicy(getRetryPolicy())
+        .retryBaseInterval(getRetryBaseInterval())
+        .retryMaxTimes(getRetryMaxTimes());
     return builder;
   }
 
@@ -531,4 +588,16 @@ public class ClientOptions {
   public long getSessionResetTimeWindowSecs() {
     return sessionResetTimeWindowSecs;
   }
+
+  public RetryPolicies getRetryPolicy() {
+    return retryPolicy;
+  }
+
+  public Duration getRetryBaseInterval() {
+    return retryBaseInterval;
+  }
+
+  public int getRetryMaxTimes() {
+    return retryMaxTimes;
+  }
 }
diff --git 
a/java-client/src/main/java/org/apache/pegasus/client/retry/DefaultRetryPolicy.java
 
b/java-client/src/main/java/org/apache/pegasus/client/retry/DefaultRetryPolicy.java
new file mode 100644
index 000000000..10b795802
--- /dev/null
+++ 
b/java-client/src/main/java/org/apache/pegasus/client/retry/DefaultRetryPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.pegasus.client.ClientOptions;
+
+/**
+ * The default retry policy, which is the only policy before we introduce the 
retry policy
+ * mechanism. It only considers the timeout value to calculate the retry 
delay, that's why we need
+ * to pass the {@code timeout} value in the {@link #shouldRetry(int, long, 
Duration)} method.
+ */
+public class DefaultRetryPolicy implements RetryPolicy {
+  public DefaultRetryPolicy(ClientOptions opts) {
+    // do nothing, just keep the constructor.
+  }
+
+  @Override
+  public RetryAction shouldRetry(int retries, long deadlineNanos, Duration 
timeout) {
+    long now = System.nanoTime();
+    if (now >= deadlineNanos) {
+      return new RetryAction(RetryDecision.FAIL, Duration.ZERO, "request 
deadline reached");
+    }
+    long timeoutNanos = timeout.toNanos();
+    long retryDelayNanos =
+        Math.min(
+            timeoutNanos > 3000000 ? timeoutNanos / 3000000 : 1,
+            TimeUnit.NANOSECONDS.toNanos(deadlineNanos - now));
+    return new RetryAction(RetryDecision.RETRY, 
Duration.ofNanos(retryDelayNanos), "");
+  }
+}
diff --git 
a/java-client/src/main/java/org/apache/pegasus/client/retry/ExponentialBackoffRetryPolicy.java
 
b/java-client/src/main/java/org/apache/pegasus/client/retry/ExponentialBackoffRetryPolicy.java
new file mode 100644
index 000000000..40b70638c
--- /dev/null
+++ 
b/java-client/src/main/java/org/apache/pegasus/client/retry/ExponentialBackoffRetryPolicy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.pegasus.client.ClientOptions;
+
+/** A retry policy which generates retry interval with exponential backoff 
policy. */
+public class ExponentialBackoffRetryPolicy implements RetryPolicy {
+
+  private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 
100, 100, 100, 200, 200};
+
+  private final long retryBaseIntervalNanos;
+
+  private final int retryMaxTimes;
+
+  public ExponentialBackoffRetryPolicy(ClientOptions opts) {
+    // The minimum unit of retry interval time for user is milliseconds.
+    // When the user sets the interval time unit to nano, it will change to 
default interval ms.
+    if (opts.getRetryBaseInterval().toMillis() < 1) {
+      this.retryBaseIntervalNanos = 
ClientOptions.DEFAULT_RETRY_BASE_INTERVAL_MS * 1000000;
+    } else {
+      this.retryBaseIntervalNanos = opts.getRetryBaseInterval().toNanos();
+    }
+    this.retryMaxTimes = opts.getRetryMaxTimes();
+  }
+
+  @Override
+  public RetryAction shouldRetry(int retries, long deadlineNanos, Duration 
timeout) {
+    if (retries >= retryMaxTimes) {
+      return new RetryAction(
+          RetryDecision.FAIL, Duration.ZERO, "max retry times " + 
retryMaxTimes + " reached");
+    }
+    long now = System.nanoTime();
+    if (now >= deadlineNanos) {
+      return new RetryAction(RetryDecision.FAIL, Duration.ZERO, "request 
deadline reached");
+    }
+    long normalIntervalNanos =
+        retryBaseIntervalNanos * RETRY_BACKOFF[Math.min(retries, 
RETRY_BACKOFF.length - 1)];
+    // 1% possible jitter
+    long jitterNanos =
+        (long) (normalIntervalNanos * ThreadLocalRandom.current().nextFloat() 
* 0.01f);
+    long retryIntervalNanos =
+        Math.min(
+            normalIntervalNanos + jitterNanos, 
TimeUnit.NANOSECONDS.toNanos(deadlineNanos - now));
+    return new RetryAction(RetryDecision.RETRY, 
Duration.ofNanos(retryIntervalNanos), "");
+  }
+}
diff --git 
a/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicies.java 
b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicies.java
new file mode 100644
index 000000000..2809329d8
--- /dev/null
+++ 
b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicies.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+public enum RetryPolicies {
+  DEFAULT(DefaultRetryPolicy.class),
+  EXPONENTIAL(ExponentialBackoffRetryPolicy.class);
+
+  private final Class<? extends RetryPolicy> clazz;
+
+  RetryPolicies(Class<? extends RetryPolicy> clazz) {
+    this.clazz = clazz;
+  }
+
+  public Class<? extends RetryPolicy> getImplementationClass() {
+    return clazz;
+  }
+}
diff --git 
a/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicy.java 
b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicy.java
new file mode 100644
index 000000000..71b461e6a
--- /dev/null
+++ b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import java.time.Duration;
+
+public interface RetryPolicy {
+
+  enum RetryDecision {
+    FAIL,
+    RETRY
+  }
+
+  final class RetryAction {
+    private final RetryDecision decision;
+
+    private final Duration delay;
+
+    private final String reason;
+
+    public RetryAction(RetryDecision decision, Duration delay, String reason) {
+      this.decision = decision;
+      this.delay = delay;
+      this.reason = reason;
+    }
+
+    public RetryDecision getDecision() {
+      return decision;
+    }
+
+    public Duration getDelay() {
+      return delay;
+    }
+
+    public String getReason() {
+      return reason;
+    }
+  }
+
+  RetryAction shouldRetry(int retries, long deadlineNanos, Duration timeout);
+}
diff --git 
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
 
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
index 84be8a22d..83769eb39 100644
--- 
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
+++ 
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
@@ -68,9 +68,11 @@ public final class ClientRequestRound {
       Table.ClientOPCallback cb,
       boolean enableCounter,
       long expireNanoTime,
-      long timeoutInMilliseconds) {
+      long timeoutInMilliseconds,
+      int tryId) {
     this(op, cb, enableCounter, timeoutInMilliseconds);
     this.expireNanoTime = expireNanoTime;
+    this.tryId = tryId;
   }
 
   public long timeoutMs() {
diff --git 
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java 
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
index f38c65fb0..01899abd6 100644
--- a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
+++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
@@ -30,6 +30,8 @@ import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pegasus.base.rpc_address;
 import org.apache.pegasus.client.ClientOptions;
+import org.apache.pegasus.client.retry.DefaultRetryPolicy;
+import org.apache.pegasus.client.retry.RetryPolicy;
 import org.apache.pegasus.metrics.MetricsManager;
 import org.apache.pegasus.rpc.Cluster;
 import org.apache.pegasus.rpc.InternalTableOptions;
@@ -42,7 +44,7 @@ public class ClusterManager extends Cluster {
 
   private int operationTimeout;
   private long sessionResetTimeWindowSecs;
-  private int retryDelay;
+  private RetryPolicy retryPolicy;
   private boolean enableCounter;
 
   private ConcurrentHashMap<rpc_address, ReplicaSession> replicaSessions;
@@ -68,6 +70,19 @@ public class ClusterManager extends Cluster {
     setTimeout((int) opts.getOperationTimeout().toMillis());
     this.enableCounter = opts.isEnablePerfCounter();
     this.sessionResetTimeWindowSecs = opts.getSessionResetTimeWindowSecs();
+    try {
+      this.retryPolicy =
+          opts.getRetryPolicy()
+              .getImplementationClass()
+              .getConstructor(ClientOptions.class)
+              .newInstance(opts);
+    } catch (Exception e) {
+      logger.warn(
+          "failed to create retry policy {}, use default retry policy instead",
+          opts.getRetryPolicy(),
+          e);
+      this.retryPolicy = new DefaultRetryPolicy(opts);
+    }
     if (enableCounter) {
       MetricsManager.detectHostAndInit(
           opts.getFalconPerfCounterTags(), (int) 
opts.getFalconPushInterval().getSeconds());
@@ -125,8 +140,8 @@ public class ClusterManager extends Cluster {
     return (timeoutMs < 3 ? 1 : timeoutMs / 3);
   }
 
-  public int getRetryDelay() {
-    return retryDelay;
+  public RetryPolicy getRetryPolicy() {
+    return retryPolicy;
   }
 
   public boolean counterEnabled() {
@@ -135,8 +150,6 @@ public class ClusterManager extends Cluster {
 
   public void setTimeout(int t) {
     operationTimeout = t;
-    // set retry delay as t/3.
-    retryDelay = (t < 3 ? 1 : t / 3);
   }
 
   public static EventLoopGroup getEventLoopGroupInstance(int threadsCount) {
diff --git 
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java 
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java
index de058b62d..ff0fd7960 100644
--- a/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java
+++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java
@@ -442,7 +442,7 @@ public class ReplicaSession {
 
     @Override
     public void channelRead0(ChannelHandlerContext ctx, final RequestEntry 
msg) {
-      logger.debug("{}: handle response with seqid({})", name(), 
msg.sequenceId);
+      logger.trace("{}: handle response with seqid({})", name(), 
msg.sequenceId);
       firstRecentTimedOutMs.set(0); // This session is currently healthy.
       if (msg.callback != null) {
         msg.callback.run();
diff --git 
a/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java 
b/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java
index 33247acf5..9bd851938 100644
--- a/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java
+++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java
@@ -20,6 +20,7 @@ package org.apache.pegasus.rpc.async;
 
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.EventExecutor;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.pegasus.base.gpid;
 import org.apache.pegasus.base.rpc_address;
 import org.apache.pegasus.client.FutureGroup;
 import org.apache.pegasus.client.PException;
+import org.apache.pegasus.client.retry.RetryPolicy;
 import org.apache.pegasus.operator.client_operator;
 import org.apache.pegasus.operator.query_cfg_operator;
 import org.apache.pegasus.replication.partition_configuration;
@@ -263,7 +265,7 @@ public class TableHandler extends Table {
     if (!inQuerying_.compareAndSet(false, true)) return false;
 
     long now = System.currentTimeMillis();
-    if (now - lastQueryTime_ < manager_.getRetryDelay()) {
+    if (now - lastQueryTime_ < Math.max(1, manager_.getTimeout() / 3)) {
       inQuerying_.set(false);
       return false;
     }
@@ -383,14 +385,26 @@ public class TableHandler extends Table {
             round.callback,
             round.enableCounter,
             round.expireNanoTime,
-            round.timeoutMs);
+            round.timeoutMs,
+            round.tryId);
     tryDelayCall(delayRequestRound);
   }
 
-  void tryDelayCall(final ClientRequestRound round) {
-    round.tryId++;
-    long nanoDelay = manager_.getRetryDelay(round.timeoutMs) * 1000000L;
-    if (round.expireNanoTime - System.nanoTime() > nanoDelay) {
+  private void tryDelayCall(final ClientRequestRound round) {
+    // tryId starts from 1 so here we minus 1
+    RetryPolicy.RetryAction action =
+        manager_
+            .getRetryPolicy()
+            .shouldRetry(round.tryId - 1, round.expireNanoTime, 
Duration.ofMillis(round.timeoutMs));
+    if (action.getDecision() == RetryPolicy.RetryDecision.RETRY) {
+      logger.debug(
+          "retry policy {} decided to retry after {} for operation with 
hashcode {},"
+              + " retry = {}",
+          manager_.getRetryPolicy().getClass().getSimpleName(),
+          action.getDelay(),
+          System.identityHashCode(round.getOperator()),
+          round.tryId);
+      round.tryId++;
       executor_.schedule(
           new Runnable() {
             @Override
@@ -398,9 +412,14 @@ public class TableHandler extends Table {
               call(round);
             }
           },
-          nanoDelay,
+          action.getDelay().toNanos(),
           TimeUnit.NANOSECONDS);
     } else {
+      logger.debug(
+          "retry policy {} decided to fail for operation with hashcode {}," + 
" retry = {}",
+          manager_.getRetryPolicy().getClass().getSimpleName(),
+          System.identityHashCode(round.getOperator()),
+          round.tryId);
       // errno == ERR_UNKNOWN means the request has never attemptted to 
contact any replica servers
       // this may happen when we can't initialize a null replica session for a 
long time
       if (round.getOperator().rpc_error.errno == 
error_code.error_types.ERR_UNKNOWN) {
diff --git 
a/java-client/src/test/java/org/apache/pegasus/client/retry/TestDefaultRetryPolicy.java
 
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestDefaultRetryPolicy.java
new file mode 100644
index 000000000..c5ea42d06
--- /dev/null
+++ 
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestDefaultRetryPolicy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+public class TestDefaultRetryPolicy {
+
+  @Test
+  public void testShouldRetry() {
+    DefaultRetryPolicy policy = new DefaultRetryPolicy(null);
+    long now = System.nanoTime();
+    RetryPolicy.RetryAction action = policy.shouldRetry(1, now - 100, 
Duration.ofMillis(100));
+    assertEquals(RetryPolicy.RetryDecision.FAIL, action.getDecision());
+
+    now = System.nanoTime();
+    action = policy.shouldRetry(1, now + TimeUnit.MINUTES.toNanos(1), 
Duration.ofMillis(300));
+    assertEquals(RetryPolicy.RetryDecision.RETRY, action.getDecision());
+    assertEquals(100, action.getDelay().toNanos());
+
+    now = System.nanoTime();
+    action = policy.shouldRetry(1, now + TimeUnit.SECONDS.toNanos(1), 
Duration.ofSeconds(10));
+    assertEquals(RetryPolicy.RetryDecision.RETRY, action.getDecision());
+    // should not have a delay which makes the nanos greater than deadline
+    assertThat(action.getDelay().toNanos(), 
lessThanOrEqualTo(TimeUnit.SECONDS.toNanos(1)));
+  }
+}
diff --git 
a/java-client/src/test/java/org/apache/pegasus/client/retry/TestExponentialBackoffRetryPolicy.java
 
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestExponentialBackoffRetryPolicy.java
new file mode 100644
index 000000000..248b38972
--- /dev/null
+++ 
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestExponentialBackoffRetryPolicy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.pegasus.client.ClientOptions;
+import org.junit.jupiter.api.Test;
+
+public class TestExponentialBackoffRetryPolicy {
+
+  @Test
+  public void test() {
+    ClientOptions opts =
+        
ClientOptions.builder().retryBaseInterval(Duration.ofMillis(10)).retryMaxTimes(200).build();
+    ExponentialBackoffRetryPolicy policy = new 
ExponentialBackoffRetryPolicy(opts);
+
+    long now = System.nanoTime();
+    RetryPolicy.RetryAction action = policy.shouldRetry(0, now + 
TimeUnit.MINUTES.toNanos(1), null);
+    assertEquals(action.getDecision(), RetryPolicy.RetryDecision.RETRY);
+    // exp = 1
+    assertThat(action.getDelay().toMillis(), 
both(greaterThan(0L)).and(lessThan(20L)));
+
+    now = System.nanoTime();
+    action = policy.shouldRetry(1, now + TimeUnit.MINUTES.toNanos(1), null);
+    assertEquals(action.getDecision(), RetryPolicy.RetryDecision.RETRY);
+    // exp = 2
+    assertThat(action.getDelay().toMillis(), 
both(greaterThan(10L)).and(lessThan(30L)));
+
+    now = System.nanoTime();
+    action = policy.shouldRetry(100, now + TimeUnit.MINUTES.toNanos(1), null);
+    assertEquals(action.getDecision(), RetryPolicy.RetryDecision.RETRY);
+    // exp = 200
+    assertThat(action.getDelay().toMillis(), 
both(greaterThan(1500L)).and(lessThan(2500L)));
+
+    now = System.nanoTime();
+    action = policy.shouldRetry(1000, now + TimeUnit.MINUTES.toNanos(1), null);
+    // reach max times
+    assertEquals(action.getDecision(), RetryPolicy.RetryDecision.FAIL);
+
+    now = System.nanoTime();
+    action = policy.shouldRetry(1000, now - 100, null);
+    // reach deadline
+    assertEquals(action.getDecision(), RetryPolicy.RetryDecision.FAIL);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to