Repository: incubator-ratis Updated Branches: refs/heads/master 09b099c71 -> 2ce10dc1a
RATIS-310. Add support for Retry Policy in Ratis. Contributed by Shashikant Banerjee Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2ce10dc1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2ce10dc1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2ce10dc1 Branch: refs/heads/master Commit: 2ce10dc1a88b7657c7d63e02774459afd8724c4d Parents: 09b099c Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Tue Sep 11 11:04:22 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Tue Sep 11 11:04:22 2018 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 13 +- .../ratis/client/impl/ClientImplUtils.java | 8 +- .../ratis/client/impl/RaftClientImpl.java | 22 +-- .../org/apache/ratis/retry/RetryPolicies.java | 141 +++++++++++++++++++ .../org/apache/ratis/retry/RetryPolicy.java | 40 ++++++ .../java/org/apache/ratis/TestRetryPolicy.java | 42 ++++++ 6 files changed, 254 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index ead9155..d28d50a 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -22,6 +22,8 @@ import org.apache.ratis.client.impl.ClientImplUtils; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; import org.slf4j.Logger; @@ -111,6 +113,7 @@ public interface RaftClient extends Closeable { private RaftPeerId leaderId; private RaftProperties properties; private Parameters parameters; + private RetryPolicy retryPolicy; private Builder() {} @@ -126,11 +129,13 @@ public interface RaftClient extends Closeable { clientRpc = factory.newRaftClientRpc(clientId, properties); } } + retryPolicy = + retryPolicy == null ? RetryPolicies.RETRY_FOREVER : retryPolicy; return ClientImplUtils.newRaftClient(clientId, Objects.requireNonNull(group, "The 'group' field is not initialized."), leaderId, Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."), - properties); + properties, retryPolicy); } /** Set {@link RaftClient} ID. */ @@ -168,5 +173,11 @@ public interface RaftClient extends Closeable { this.parameters = parameters; return this; } + + /** Set {@link RetryPolicy}. */ + public Builder setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index d813650..f2564a9 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -21,6 +21,8 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; @@ -28,7 +30,9 @@ import org.apache.ratis.protocol.RaftPeerId; /** Client utilities for internal use. */ public class ClientImplUtils { public static RaftClient newRaftClient(ClientId clientId, RaftGroup group, - RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties) { - return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties); + RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties, + RetryPolicy retryPolicy) { + return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties, + retryPolicy); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 9419c7f..d6c9a27 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -20,6 +20,8 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; @@ -91,6 +93,7 @@ final class RaftClientImpl implements RaftClient { private final Collection<RaftPeer> peers; private final RaftGroupId groupId; private final TimeDuration retryInterval; + private final RetryPolicy retryPolicy; private volatile RaftPeerId leaderId; @@ -101,7 +104,7 @@ final class RaftClientImpl implements RaftClient { private final Semaphore asyncRequestSemaphore; RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, - RaftClientRpc clientRpc, RaftProperties properties) { + RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) { this.clientId = clientId; this.clientRpc = clientRpc; this.peers = new ConcurrentLinkedQueue<>(group.getPeers()); @@ -109,6 +112,8 @@ final class RaftClientImpl implements RaftClient { this.leaderId = leaderId != null? leaderId : !peers.isEmpty()? peers.iterator().next().getId(): null; this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties); + Preconditions.assertTrue(retryPolicy != null, "retry policy can't be null"); + this.retryPolicy = retryPolicy; asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties)); scheduler = TimeoutScheduler.newInstance(RaftClientConfigKeys.Async.schedulerThreads(properties)); @@ -258,20 +263,19 @@ final class RaftClientImpl implements RaftClient { private RaftClientReply sendRequestWithRetry( Supplier<RaftClientRequest> supplier) throws InterruptedIOException, StateMachineException, GroupMismatchException { - for(;;) { + for(int retryCount = 0;; retryCount++) { final RaftClientRequest request = supplier.get(); final RaftClientReply reply = sendRequest(request); if (reply != null) { return reply; } - - // sleep and then retry + if (!retryPolicy.shouldRetry(retryCount)) { + return null; + } try { - retryInterval.sleep(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw IOUtils.toInterruptedIOException( - "Interrupted when sending " + request, ie); + retryPolicy.getSleepTime().sleep(); + } catch (InterruptedException e) { + throw new InterruptedIOException("retry policy=" + retryPolicy); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java new file mode 100644 index 0000000..4505c8d --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.retry; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.util.TimeDuration; + +import java.util.concurrent.TimeUnit; + +/** + * A collection of {@link RetryPolicy} implementations + */ +public class RetryPolicies { + /** + * Keep retrying forever. + */ + public static final RetryPolicy RETRY_FOREVER = new RetryForever(); + + /** + * Keep trying a limited number of times, waiting a fixed time between attempts, + * and then fail by re-throwing the exception. + */ + public static final RetryPolicy retryUpToMaximumCountWithFixedSleep( + int maxRetries, TimeDuration sleepTime) { + return new RetryUpToMaximumCountWithFixedSleep(maxRetries, sleepTime); + } + + + static class RetryForever implements RetryPolicy { + @Override + public boolean shouldRetry(int retryCount) { + return true; + } + + @Override + public TimeDuration getSleepTime() { + return TimeDuration.valueOf(0, TimeUnit.MILLISECONDS); + } + } + + /** + * Retry up to maxRetries. + * The actual sleep time of the n-th retry is f(n, sleepTime), + * where f is a function provided by the subclass implementation. + * + * The object of the subclasses should be immutable; + * otherwise, the subclass must override hashCode(), equals(..) and toString(). + */ + static abstract class RetryLimited implements RetryPolicy { + private final int maxRetries; + private final TimeDuration sleepTime; + + private String myString; + + RetryLimited(int maxRetries, TimeDuration sleepTime) { + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0"); + } + if (sleepTime.isNegative()) { + throw new IllegalArgumentException( + "sleepTime = " + sleepTime.getDuration() + " < 0"); + } + + this.maxRetries = maxRetries; + this.sleepTime = sleepTime; + } + + @Override + public TimeDuration getSleepTime() { + return sleepTime; + } + + public int getMaxRetries() { + return maxRetries; + } + + @Override + public boolean shouldRetry(int retryCount) { + if (retryCount >= maxRetries) { + return false; + } else { + return true; + } + } + + protected String getReason() { + return constructReasonString(maxRetries); + } + + @VisibleForTesting + public static String constructReasonString(int retries) { + return "retries get failed due to exceeded maximum allowed retries " + + "number: " + retries; + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(final Object that) { + if (this == that) { + return true; + } else if (that == null || this.getClass() != that.getClass()) { + return false; + } + return this.toString().equals(that.toString()); + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries + + ", sleepTime=" + sleepTime + ")"; + } + return myString; + } + } + + static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited { + public RetryUpToMaximumCountWithFixedSleep(int maxRetries, TimeDuration sleepTime) { + super(maxRetries, sleepTime); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java new file mode 100644 index 0000000..fe4ee80 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.retry; + +import org.apache.ratis.util.TimeDuration; + +/** + * Policy abstract for retrying. + */ +public interface RetryPolicy { + + /** + * Determines whether it is supposed to retry the connection if the operation + * fails for some reason. + * + * @param retryCount The number of times retried so far + * @return true if it has to make another attempt, otherwise, false + */ + boolean shouldRetry(int retryCount); + + /** + * Returns the time duration for sleep in between the retries. + */ + TimeDuration getSleepTime(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java b/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java new file mode 100644 index 0000000..d481003 --- /dev/null +++ b/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.util.TimeDuration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + + + +public class TestRetryPolicy { + + @Test + public void testRetryMultipleTimesWithFixedSleep() { + RetryPolicy retryPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(2, + TimeDuration.valueOf(1000L, TimeUnit.MILLISECONDS)); + boolean shouldRetry = retryPolicy.shouldRetry(1); + Assert.assertTrue(shouldRetry); + Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration()); + Assert.assertFalse(retryPolicy.shouldRetry(2)); + } +} \ No newline at end of file
