This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 0942c4aa8 feat(java-client): Introduce exponentialBackoff retry
mechanism (#1822)
0942c4aa8 is described below
commit 0942c4aa8283ab336c98fe09e8851d6950d20745
Author: Samunroyu <[email protected]>
AuthorDate: Thu Mar 7 15:40:09 2024 +0800
feat(java-client): Introduce exponentialBackoff retry mechanism (#1822)
Introduces a retry mechanism into the Java client to enhance user
convenience and robustness.
It can be enabled by calling `retryPolicy()`, `retryBaseInterval()`
and `retryMaxTimes()` in when construct ClientOptions objects.
---
.../org/apache/pegasus/client/ClientOptions.java | 79 ++++++++++++++++++++--
.../pegasus/client/retry/DefaultRetryPolicy.java | 48 +++++++++++++
.../retry/ExponentialBackoffRetryPolicy.java | 66 ++++++++++++++++++
.../apache/pegasus/client/retry/RetryPolicies.java | 34 ++++++++++
.../apache/pegasus/client/retry/RetryPolicy.java | 57 ++++++++++++++++
.../pegasus/rpc/async/ClientRequestRound.java | 4 +-
.../apache/pegasus/rpc/async/ClusterManager.java | 23 +++++--
.../apache/pegasus/rpc/async/ReplicaSession.java | 2 +-
.../org/apache/pegasus/rpc/async/TableHandler.java | 33 +++++++--
.../client/retry/TestDefaultRetryPolicy.java | 49 ++++++++++++++
.../retry/TestExponentialBackoffRetryPolicy.java | 66 ++++++++++++++++++
11 files changed, 442 insertions(+), 19 deletions(-)
diff --git
a/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java
b/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java
index b4ab3eb02..f9e07276e 100644
--- a/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java
+++ b/java-client/src/main/java/org/apache/pegasus/client/ClientOptions.java
@@ -19,6 +19,7 @@ import static
org.apache.pegasus.client.PConfigUtil.loadConfiguration;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
+import org.apache.pegasus.client.retry.RetryPolicies;
import org.apache.pegasus.security.Credential;
import org.apache.pegasus.tools.WriteLimiter;
import org.apache.pegasus.util.PropertyUtils;
@@ -56,7 +57,9 @@ public class ClientOptions {
public static final String PEGASUS_AUTH_PROTOCOL_KEY = "auth_protocol";
public static final String PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY =
"session_reset_time_window_secs";
-
+ public static final String PEGASUS_RETRY_POLICY_KEY = "retry_policy";
+ public static final String PEGASUS_RETRY_BASE_INTERVAL_MS =
"retry_base_interval_ms";
+ public static final String PEGASUS_RETRY_MAX_TIMES = "retry_max_times";
public static final String DEFAULT_META_SERVERS =
"127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
public static final Duration DEFAULT_OPERATION_TIMEOUT =
Duration.ofMillis(1000);
@@ -68,7 +71,9 @@ public class ClientOptions {
public static final Duration DEFAULT_META_QUERY_TIMEOUT =
Duration.ofMillis(5000);
public static final String DEFAULT_AUTH_PROTOCOL = "";
public static final long DEFAULT_SESSION_RESET_SECS_WINDOW = 30;
-
+ public static final String DEFAULT_RETRY_POLICY =
RetryPolicies.DEFAULT.name().toLowerCase();
+ public static final long DEFAULT_RETRY_BASE_INTERVAL_MS = 20;
+ public static final int DEFAULT_RETRY_MAX_TIMES = Integer.MAX_VALUE;
private final String metaServers;
private final Duration operationTimeout;
private final int asyncWorkers;
@@ -79,6 +84,9 @@ public class ClientOptions {
private final Duration metaQueryTimeout;
private final Credential credential;
private final long sessionResetTimeWindowSecs;
+ private final RetryPolicies retryPolicy;
+ private final Duration retryBaseInterval;
+ private final int retryMaxTimes;
protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
@@ -91,6 +99,9 @@ public class ClientOptions {
this.metaQueryTimeout = builder.metaQueryTimeout;
this.credential = builder.credential;
this.sessionResetTimeWindowSecs = builder.sessionResetTimeWindowSecs;
+ this.retryPolicy = builder.retryPolicy;
+ this.retryBaseInterval = builder.retryBaseInterval;
+ this.retryMaxTimes = builder.retryMaxTimes;
}
protected ClientOptions(ClientOptions original) {
@@ -104,6 +115,9 @@ public class ClientOptions {
this.metaQueryTimeout = original.getMetaQueryTimeout();
this.credential = original.getCredential();
this.sessionResetTimeWindowSecs = original.getSessionResetTimeWindowSecs();
+ this.retryPolicy = original.getRetryPolicy();
+ this.retryBaseInterval = original.getRetryBaseInterval();
+ this.retryMaxTimes = original.getRetryMaxTimes();
}
/**
@@ -176,6 +190,15 @@ public class ClientOptions {
long sessionResetTimeWindowSecs =
PropertyUtils.getLong(
config, PEGASUS_SESSION_RESET_TIME_WINDOW_SECS_KEY,
DEFAULT_SESSION_RESET_SECS_WINDOW);
+ RetryPolicies retryPolicy =
+ RetryPolicies.valueOf(
+ config.getProperty(PEGASUS_RETRY_POLICY_KEY,
DEFAULT_RETRY_POLICY.toUpperCase()));
+ Duration retryBaseInterval =
+ Duration.ofMillis(
+ PropertyUtils.getLong(
+ config, PEGASUS_RETRY_BASE_INTERVAL_MS,
DEFAULT_RETRY_BASE_INTERVAL_MS));
+ int retryMaxTimes =
+ PropertyUtils.getInt(config, PEGASUS_RETRY_MAX_TIMES,
DEFAULT_RETRY_MAX_TIMES);
return ClientOptions.builder()
.metaServers(metaList)
@@ -187,6 +210,9 @@ public class ClientOptions {
.metaQueryTimeout(metaQueryTimeout)
.credential(credential)
.sessionResetTimeWindowSecs(sessionResetTimeWindowSecs)
+ .retryPolicy(retryPolicy)
+ .retryBaseInterval(retryBaseInterval)
+ .retryMaxTimes(retryMaxTimes)
.build();
}
@@ -206,7 +232,10 @@ public class ClientOptions {
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() ==
clientOptions.metaQueryTimeout.toMillis()
&& this.credential == clientOptions.credential
- && this.sessionResetTimeWindowSecs ==
clientOptions.sessionResetTimeWindowSecs;
+ && this.sessionResetTimeWindowSecs ==
clientOptions.sessionResetTimeWindowSecs
+ && this.retryPolicy == clientOptions.retryPolicy
+ && this.retryBaseInterval.equals(clientOptions.retryBaseInterval)
+ && this.retryMaxTimes == clientOptions.retryMaxTimes;
}
return false;
}
@@ -250,7 +279,13 @@ public class ClientOptions {
+ ", metaQueryTimeout(ms)="
+ metaQueryTimeout.toMillis()
+ ", sessionResetTimeWindowSecs="
- + sessionResetTimeWindowSecs;
+ + sessionResetTimeWindowSecs
+ + ", retryPolicy="
+ + retryPolicy
+ + ", retryBaseInterval="
+ + retryBaseInterval
+ + ", retryMaxTimes="
+ + retryMaxTimes;
if (credential != null) {
res += ", credential=" + credential.toString();
}
@@ -269,6 +304,9 @@ public class ClientOptions {
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
private Credential credential = null;
private long sessionResetTimeWindowSecs =
DEFAULT_SESSION_RESET_SECS_WINDOW;
+ private RetryPolicies retryPolicy =
RetryPolicies.valueOf(DEFAULT_RETRY_POLICY.toUpperCase());
+ private Duration retryBaseInterval =
Duration.ofMillis(DEFAULT_RETRY_BASE_INTERVAL_MS);
+ private int retryMaxTimes = DEFAULT_RETRY_MAX_TIMES;
protected Builder() {}
@@ -400,6 +438,22 @@ public class ClientOptions {
return this;
}
+ public Builder retryPolicy(RetryPolicies retryPolicy) {
+ this.retryPolicy = retryPolicy;
+ return this;
+ }
+
+ public Builder retryBaseInterval(Duration retryBaseInterval) {
+ this.retryBaseInterval = retryBaseInterval;
+ return this;
+ }
+
+ public Builder retryMaxTimes(int retryMaxTimes) {
+ assert retryMaxTimes >= 0 : String.format("must pass non negative value:
%d", retryMaxTimes);
+ this.retryMaxTimes = retryMaxTimes;
+ return this;
+ }
+
/**
* Create a new instance of {@link ClientOptions}.
*
@@ -432,7 +486,10 @@ public class ClientOptions {
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout())
- .credential(getCredential());
+ .credential(getCredential())
+ .retryPolicy(getRetryPolicy())
+ .retryBaseInterval(getRetryBaseInterval())
+ .retryMaxTimes(getRetryMaxTimes());
return builder;
}
@@ -531,4 +588,16 @@ public class ClientOptions {
public long getSessionResetTimeWindowSecs() {
return sessionResetTimeWindowSecs;
}
+
+ public RetryPolicies getRetryPolicy() {
+ return retryPolicy;
+ }
+
+ public Duration getRetryBaseInterval() {
+ return retryBaseInterval;
+ }
+
+ public int getRetryMaxTimes() {
+ return retryMaxTimes;
+ }
}
diff --git
a/java-client/src/main/java/org/apache/pegasus/client/retry/DefaultRetryPolicy.java
b/java-client/src/main/java/org/apache/pegasus/client/retry/DefaultRetryPolicy.java
new file mode 100644
index 000000000..10b795802
--- /dev/null
+++
b/java-client/src/main/java/org/apache/pegasus/client/retry/DefaultRetryPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.pegasus.client.ClientOptions;
+
+/**
+ * The default retry policy, which is the only policy before we introduce the
retry policy
+ * mechanism. It only considers the timeout value to calculate the retry
delay, that's why we need
+ * to pass the {@code timeout} value in the {@link #shouldRetry(int, long,
Duration)} method.
+ */
+public class DefaultRetryPolicy implements RetryPolicy {
+ public DefaultRetryPolicy(ClientOptions opts) {
+ // do nothing, just keep the constructor.
+ }
+
+ @Override
+ public RetryAction shouldRetry(int retries, long deadlineNanos, Duration
timeout) {
+ long now = System.nanoTime();
+ if (now >= deadlineNanos) {
+ return new RetryAction(RetryDecision.FAIL, Duration.ZERO, "request
deadline reached");
+ }
+ long timeoutNanos = timeout.toNanos();
+ long retryDelayNanos =
+ Math.min(
+ timeoutNanos > 3000000 ? timeoutNanos / 3000000 : 1,
+ TimeUnit.NANOSECONDS.toNanos(deadlineNanos - now));
+ return new RetryAction(RetryDecision.RETRY,
Duration.ofNanos(retryDelayNanos), "");
+ }
+}
diff --git
a/java-client/src/main/java/org/apache/pegasus/client/retry/ExponentialBackoffRetryPolicy.java
b/java-client/src/main/java/org/apache/pegasus/client/retry/ExponentialBackoffRetryPolicy.java
new file mode 100644
index 000000000..40b70638c
--- /dev/null
+++
b/java-client/src/main/java/org/apache/pegasus/client/retry/ExponentialBackoffRetryPolicy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.pegasus.client.ClientOptions;
+
+/** A retry policy which generates retry interval with exponential backoff
policy. */
+public class ExponentialBackoffRetryPolicy implements RetryPolicy {
+
+ private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100,
100, 100, 100, 200, 200};
+
+ private final long retryBaseIntervalNanos;
+
+ private final int retryMaxTimes;
+
+ public ExponentialBackoffRetryPolicy(ClientOptions opts) {
+ // The minimum unit of retry interval time for user is milliseconds.
+ // When the user sets the interval time unit to nano, it will change to
default interval ms.
+ if (opts.getRetryBaseInterval().toMillis() < 1) {
+ this.retryBaseIntervalNanos =
ClientOptions.DEFAULT_RETRY_BASE_INTERVAL_MS * 1000000;
+ } else {
+ this.retryBaseIntervalNanos = opts.getRetryBaseInterval().toNanos();
+ }
+ this.retryMaxTimes = opts.getRetryMaxTimes();
+ }
+
+ @Override
+ public RetryAction shouldRetry(int retries, long deadlineNanos, Duration
timeout) {
+ if (retries >= retryMaxTimes) {
+ return new RetryAction(
+ RetryDecision.FAIL, Duration.ZERO, "max retry times " +
retryMaxTimes + " reached");
+ }
+ long now = System.nanoTime();
+ if (now >= deadlineNanos) {
+ return new RetryAction(RetryDecision.FAIL, Duration.ZERO, "request
deadline reached");
+ }
+ long normalIntervalNanos =
+ retryBaseIntervalNanos * RETRY_BACKOFF[Math.min(retries,
RETRY_BACKOFF.length - 1)];
+ // 1% possible jitter
+ long jitterNanos =
+ (long) (normalIntervalNanos * ThreadLocalRandom.current().nextFloat()
* 0.01f);
+ long retryIntervalNanos =
+ Math.min(
+ normalIntervalNanos + jitterNanos,
TimeUnit.NANOSECONDS.toNanos(deadlineNanos - now));
+ return new RetryAction(RetryDecision.RETRY,
Duration.ofNanos(retryIntervalNanos), "");
+ }
+}
diff --git
a/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicies.java
b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicies.java
new file mode 100644
index 000000000..2809329d8
--- /dev/null
+++
b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicies.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+public enum RetryPolicies {
+ DEFAULT(DefaultRetryPolicy.class),
+ EXPONENTIAL(ExponentialBackoffRetryPolicy.class);
+
+ private final Class<? extends RetryPolicy> clazz;
+
+ RetryPolicies(Class<? extends RetryPolicy> clazz) {
+ this.clazz = clazz;
+ }
+
+ public Class<? extends RetryPolicy> getImplementationClass() {
+ return clazz;
+ }
+}
diff --git
a/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicy.java
b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicy.java
new file mode 100644
index 000000000..71b461e6a
--- /dev/null
+++ b/java-client/src/main/java/org/apache/pegasus/client/retry/RetryPolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import java.time.Duration;
+
+public interface RetryPolicy {
+
+ enum RetryDecision {
+ FAIL,
+ RETRY
+ }
+
+ final class RetryAction {
+ private final RetryDecision decision;
+
+ private final Duration delay;
+
+ private final String reason;
+
+ public RetryAction(RetryDecision decision, Duration delay, String reason) {
+ this.decision = decision;
+ this.delay = delay;
+ this.reason = reason;
+ }
+
+ public RetryDecision getDecision() {
+ return decision;
+ }
+
+ public Duration getDelay() {
+ return delay;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+ }
+
+ RetryAction shouldRetry(int retries, long deadlineNanos, Duration timeout);
+}
diff --git
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
index 84be8a22d..83769eb39 100644
---
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
+++
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClientRequestRound.java
@@ -68,9 +68,11 @@ public final class ClientRequestRound {
Table.ClientOPCallback cb,
boolean enableCounter,
long expireNanoTime,
- long timeoutInMilliseconds) {
+ long timeoutInMilliseconds,
+ int tryId) {
this(op, cb, enableCounter, timeoutInMilliseconds);
this.expireNanoTime = expireNanoTime;
+ this.tryId = tryId;
}
public long timeoutMs() {
diff --git
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
index f38c65fb0..01899abd6 100644
--- a/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
+++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/ClusterManager.java
@@ -30,6 +30,8 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pegasus.base.rpc_address;
import org.apache.pegasus.client.ClientOptions;
+import org.apache.pegasus.client.retry.DefaultRetryPolicy;
+import org.apache.pegasus.client.retry.RetryPolicy;
import org.apache.pegasus.metrics.MetricsManager;
import org.apache.pegasus.rpc.Cluster;
import org.apache.pegasus.rpc.InternalTableOptions;
@@ -42,7 +44,7 @@ public class ClusterManager extends Cluster {
private int operationTimeout;
private long sessionResetTimeWindowSecs;
- private int retryDelay;
+ private RetryPolicy retryPolicy;
private boolean enableCounter;
private ConcurrentHashMap<rpc_address, ReplicaSession> replicaSessions;
@@ -68,6 +70,19 @@ public class ClusterManager extends Cluster {
setTimeout((int) opts.getOperationTimeout().toMillis());
this.enableCounter = opts.isEnablePerfCounter();
this.sessionResetTimeWindowSecs = opts.getSessionResetTimeWindowSecs();
+ try {
+ this.retryPolicy =
+ opts.getRetryPolicy()
+ .getImplementationClass()
+ .getConstructor(ClientOptions.class)
+ .newInstance(opts);
+ } catch (Exception e) {
+ logger.warn(
+ "failed to create retry policy {}, use default retry policy instead",
+ opts.getRetryPolicy(),
+ e);
+ this.retryPolicy = new DefaultRetryPolicy(opts);
+ }
if (enableCounter) {
MetricsManager.detectHostAndInit(
opts.getFalconPerfCounterTags(), (int)
opts.getFalconPushInterval().getSeconds());
@@ -125,8 +140,8 @@ public class ClusterManager extends Cluster {
return (timeoutMs < 3 ? 1 : timeoutMs / 3);
}
- public int getRetryDelay() {
- return retryDelay;
+ public RetryPolicy getRetryPolicy() {
+ return retryPolicy;
}
public boolean counterEnabled() {
@@ -135,8 +150,6 @@ public class ClusterManager extends Cluster {
public void setTimeout(int t) {
operationTimeout = t;
- // set retry delay as t/3.
- retryDelay = (t < 3 ? 1 : t / 3);
}
public static EventLoopGroup getEventLoopGroupInstance(int threadsCount) {
diff --git
a/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java
b/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java
index de058b62d..ff0fd7960 100644
--- a/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java
+++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/ReplicaSession.java
@@ -442,7 +442,7 @@ public class ReplicaSession {
@Override
public void channelRead0(ChannelHandlerContext ctx, final RequestEntry
msg) {
- logger.debug("{}: handle response with seqid({})", name(),
msg.sequenceId);
+ logger.trace("{}: handle response with seqid({})", name(),
msg.sequenceId);
firstRecentTimedOutMs.set(0); // This session is currently healthy.
if (msg.callback != null) {
msg.callback.run();
diff --git
a/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java
b/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java
index 33247acf5..9bd851938 100644
--- a/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java
+++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java
@@ -20,6 +20,7 @@ package org.apache.pegasus.rpc.async;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.pegasus.base.gpid;
import org.apache.pegasus.base.rpc_address;
import org.apache.pegasus.client.FutureGroup;
import org.apache.pegasus.client.PException;
+import org.apache.pegasus.client.retry.RetryPolicy;
import org.apache.pegasus.operator.client_operator;
import org.apache.pegasus.operator.query_cfg_operator;
import org.apache.pegasus.replication.partition_configuration;
@@ -263,7 +265,7 @@ public class TableHandler extends Table {
if (!inQuerying_.compareAndSet(false, true)) return false;
long now = System.currentTimeMillis();
- if (now - lastQueryTime_ < manager_.getRetryDelay()) {
+ if (now - lastQueryTime_ < Math.max(1, manager_.getTimeout() / 3)) {
inQuerying_.set(false);
return false;
}
@@ -383,14 +385,26 @@ public class TableHandler extends Table {
round.callback,
round.enableCounter,
round.expireNanoTime,
- round.timeoutMs);
+ round.timeoutMs,
+ round.tryId);
tryDelayCall(delayRequestRound);
}
- void tryDelayCall(final ClientRequestRound round) {
- round.tryId++;
- long nanoDelay = manager_.getRetryDelay(round.timeoutMs) * 1000000L;
- if (round.expireNanoTime - System.nanoTime() > nanoDelay) {
+ private void tryDelayCall(final ClientRequestRound round) {
+ // tryId starts from 1 so here we minus 1
+ RetryPolicy.RetryAction action =
+ manager_
+ .getRetryPolicy()
+ .shouldRetry(round.tryId - 1, round.expireNanoTime,
Duration.ofMillis(round.timeoutMs));
+ if (action.getDecision() == RetryPolicy.RetryDecision.RETRY) {
+ logger.debug(
+ "retry policy {} decided to retry after {} for operation with
hashcode {},"
+ + " retry = {}",
+ manager_.getRetryPolicy().getClass().getSimpleName(),
+ action.getDelay(),
+ System.identityHashCode(round.getOperator()),
+ round.tryId);
+ round.tryId++;
executor_.schedule(
new Runnable() {
@Override
@@ -398,9 +412,14 @@ public class TableHandler extends Table {
call(round);
}
},
- nanoDelay,
+ action.getDelay().toNanos(),
TimeUnit.NANOSECONDS);
} else {
+ logger.debug(
+ "retry policy {} decided to fail for operation with hashcode {}," +
" retry = {}",
+ manager_.getRetryPolicy().getClass().getSimpleName(),
+ System.identityHashCode(round.getOperator()),
+ round.tryId);
// errno == ERR_UNKNOWN means the request has never attemptted to
contact any replica servers
// this may happen when we can't initialize a null replica session for a
long time
if (round.getOperator().rpc_error.errno ==
error_code.error_types.ERR_UNKNOWN) {
diff --git
a/java-client/src/test/java/org/apache/pegasus/client/retry/TestDefaultRetryPolicy.java
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestDefaultRetryPolicy.java
new file mode 100644
index 000000000..c5ea42d06
--- /dev/null
+++
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestDefaultRetryPolicy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+public class TestDefaultRetryPolicy {
+
+ @Test
+ public void testShouldRetry() {
+ DefaultRetryPolicy policy = new DefaultRetryPolicy(null);
+ long now = System.nanoTime();
+ RetryPolicy.RetryAction action = policy.shouldRetry(1, now - 100,
Duration.ofMillis(100));
+ assertEquals(RetryPolicy.RetryDecision.FAIL, action.getDecision());
+
+ now = System.nanoTime();
+ action = policy.shouldRetry(1, now + TimeUnit.MINUTES.toNanos(1),
Duration.ofMillis(300));
+ assertEquals(RetryPolicy.RetryDecision.RETRY, action.getDecision());
+ assertEquals(100, action.getDelay().toNanos());
+
+ now = System.nanoTime();
+ action = policy.shouldRetry(1, now + TimeUnit.SECONDS.toNanos(1),
Duration.ofSeconds(10));
+ assertEquals(RetryPolicy.RetryDecision.RETRY, action.getDecision());
+ // should not have a delay which makes the nanos greater than deadline
+ assertThat(action.getDelay().toNanos(),
lessThanOrEqualTo(TimeUnit.SECONDS.toNanos(1)));
+ }
+}
diff --git
a/java-client/src/test/java/org/apache/pegasus/client/retry/TestExponentialBackoffRetryPolicy.java
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestExponentialBackoffRetryPolicy.java
new file mode 100644
index 000000000..248b38972
--- /dev/null
+++
b/java-client/src/test/java/org/apache/pegasus/client/retry/TestExponentialBackoffRetryPolicy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pegasus.client.retry;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.pegasus.client.ClientOptions;
+import org.junit.jupiter.api.Test;
+
+public class TestExponentialBackoffRetryPolicy {
+
+ @Test
+ public void test() {
+ ClientOptions opts =
+
ClientOptions.builder().retryBaseInterval(Duration.ofMillis(10)).retryMaxTimes(200).build();
+ ExponentialBackoffRetryPolicy policy = new
ExponentialBackoffRetryPolicy(opts);
+
+ long now = System.nanoTime();
+ RetryPolicy.RetryAction action = policy.shouldRetry(0, now +
TimeUnit.MINUTES.toNanos(1), null);
+ assertEquals(action.getDecision(), RetryPolicy.RetryDecision.RETRY);
+ // exp = 1
+ assertThat(action.getDelay().toMillis(),
both(greaterThan(0L)).and(lessThan(20L)));
+
+ now = System.nanoTime();
+ action = policy.shouldRetry(1, now + TimeUnit.MINUTES.toNanos(1), null);
+ assertEquals(action.getDecision(), RetryPolicy.RetryDecision.RETRY);
+ // exp = 2
+ assertThat(action.getDelay().toMillis(),
both(greaterThan(10L)).and(lessThan(30L)));
+
+ now = System.nanoTime();
+ action = policy.shouldRetry(100, now + TimeUnit.MINUTES.toNanos(1), null);
+ assertEquals(action.getDecision(), RetryPolicy.RetryDecision.RETRY);
+ // exp = 200
+ assertThat(action.getDelay().toMillis(),
both(greaterThan(1500L)).and(lessThan(2500L)));
+
+ now = System.nanoTime();
+ action = policy.shouldRetry(1000, now + TimeUnit.MINUTES.toNanos(1), null);
+ // reach max times
+ assertEquals(action.getDecision(), RetryPolicy.RetryDecision.FAIL);
+
+ now = System.nanoTime();
+ action = policy.shouldRetry(1000, now - 100, null);
+ // reach deadline
+ assertEquals(action.getDecision(), RetryPolicy.RetryDecision.FAIL);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]