This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/curator.git
commit 04affff861f142881ebb60fce5d8d89568385ab5 Author: tison <[email protected]> AuthorDate: Mon May 4 12:51:29 2020 +0800 CURATOR-544: SessionFailedRetryPolicy --- .../main/java/org/apache/curator/RetryLoop.java | 32 ------------- .../java/org/apache/curator/RetryLoopImpl.java | 2 +- .../main/java/org/apache/curator/RetryPolicy.java | 25 +++++++++- .../apache/curator/SessionFailedRetryPolicy.java | 36 +++++++++++++++ .../java/org/apache/curator/TestRetryLoop.java | 53 ++++++++++++++++++++++ .../curator/framework/imps/CreateBuilderImpl.java | 5 +- .../framework/imps/CuratorFrameworkImpl.java | 5 +- .../curator/framework/imps/DeleteBuilderImpl.java | 2 +- .../framework/imps/RemoveWatchesBuilderImpl.java | 6 +-- 9 files changed, 122 insertions(+), 44 deletions(-) diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java index 1720290..070d9b3 100644 --- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java +++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java @@ -20,7 +20,6 @@ package org.apache.curator; import org.apache.curator.connection.ThreadLocalRetryLoop; import org.apache.curator.utils.ThreadUtils; -import org.apache.zookeeper.KeeperException; import java.util.concurrent.Callable; /** @@ -122,37 +121,6 @@ public abstract class RetryLoop public abstract void markComplete(); /** - * Utility - return true if the given Zookeeper result code is retry-able - * - * @param rc result code - * @return true/false - */ - public static boolean shouldRetry(int rc) - { - return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) || - (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) || - (rc == KeeperException.Code.SESSIONMOVED.intValue()) || - (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) || - (rc == -13); // KeeperException.Code.NEWCONFIGNOQUORUM.intValue()) - using hard coded value for ZK 3.4.x compatibility - } - - /** - * Utility - return true if the given exception is retry-able - * - * @param exception exception to check - * @return true/false - */ - public static boolean isRetryException(Throwable exception) - { - if ( exception instanceof KeeperException ) - { - KeeperException keeperException = (KeeperException)exception; - return shouldRetry(keeperException.code().intValue()); - } - return false; - } - - /** * Pass any caught exceptions here * * @param exception the exception diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java index bc1c244..d987f9f 100644 --- a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java +++ b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java @@ -66,7 +66,7 @@ class RetryLoopImpl extends RetryLoop public void takeException(Exception exception) throws Exception { boolean rethrow = true; - if ( RetryLoop.isRetryException(exception) ) + if ( retryPolicy.allowRetry(exception) ) { if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { diff --git a/curator-client/src/main/java/org/apache/curator/RetryPolicy.java b/curator-client/src/main/java/org/apache/curator/RetryPolicy.java index 6fca7e4..49f2e88 100644 --- a/curator-client/src/main/java/org/apache/curator/RetryPolicy.java +++ b/curator-client/src/main/java/org/apache/curator/RetryPolicy.java @@ -18,6 +18,8 @@ */ package org.apache.curator; +import org.apache.zookeeper.KeeperException; + /** * Abstracts the policy to use when retrying connections */ @@ -33,5 +35,26 @@ public interface RetryPolicy * @param sleeper use this to sleep - DO NOT call Thread.sleep * @return true/false */ - public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper); + boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper); + + /** + * Called when an operation has failed with a specific exception. This method + * should return true to make another attempt. + * + * @param exception the cause that this operation failed + * @return true/false + */ + default boolean allowRetry(Throwable exception) + { + if ( exception instanceof KeeperException) + { + final int rc = ((KeeperException) exception).code().intValue(); + return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) || + (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) || + (rc == KeeperException.Code.SESSIONMOVED.intValue()) || + (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) || + (rc == -13); // KeeperException.Code.NEWCONFIGNOQUORUM.intValue()) - using hard coded value for ZK 3.4.x compatibility + } + return false; + } } diff --git a/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java b/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java new file mode 100644 index 0000000..77ad7be --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/SessionFailedRetryPolicy.java @@ -0,0 +1,36 @@ +package org.apache.curator; + +import org.apache.zookeeper.KeeperException; + +/** + * {@link RetryPolicy} implementation that failed on session expired. + */ +public class SessionFailedRetryPolicy implements RetryPolicy +{ + + private final RetryPolicy delegatePolicy; + + public SessionFailedRetryPolicy(RetryPolicy delegatePolicy) + { + this.delegatePolicy = delegatePolicy; + } + + @Override + public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) + { + return delegatePolicy.allowRetry(retryCount, elapsedTimeMs, sleeper); + } + + @Override + public boolean allowRetry(Throwable exception) + { + if ( exception instanceof KeeperException.SessionExpiredException ) + { + return false; + } + else + { + return delegatePolicy.allowRetry(exception); + } + } +} diff --git a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java index 17bb91e..0922dff 100644 --- a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java +++ b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java @@ -22,7 +22,9 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.mockito.Mockito; import org.testng.Assert; @@ -162,4 +164,55 @@ public class TestRetryLoop extends BaseClassForTests Mockito.verify(sleeper, times(i + 1)).sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS); } } + + @Test + public void testRetryForeverWithSessionFailed() throws Exception + { + final Timing timing = new Timing(); + final RetryPolicy retryPolicy = new SessionFailedRetryPolicy(new RetryForever(1000)); + final CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), timing.session(), timing.connection(), null, retryPolicy); + client.start(); + + try + { + int loopCount = 0; + final RetryLoop retryLoop = client.newRetryLoop(); + while ( retryLoop.shouldContinue() ) + { + if ( ++loopCount > 1 ) + { + break; + } + + try + { + client.getZooKeeper().getTestable().injectSessionExpiration(); + client.getZooKeeper().create("/test", new byte[]{1,2,3}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + retryLoop.markComplete(); + } + catch ( Exception e ) + { + retryLoop.takeException(e); + } + } + + Assert.fail("Should failed with SessionExpiredException."); + } + catch ( Exception e ) + { + if ( e instanceof KeeperException ) + { + int rc = ((KeeperException) e).code().intValue(); + Assert.assertEquals(rc, KeeperException.Code.SESSIONEXPIRED.intValue()); + } + else + { + throw e; + } + } + finally + { + client.close(); + } + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index ee5c541..2ccc173 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -20,7 +20,6 @@ package org.apache.curator.framework.imps; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.curator.RetryLoop; @@ -36,14 +35,12 @@ import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -770,7 +767,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro } catch ( KeeperException e ) { - if ( !RetryLoop.isRetryException(e) ) + if ( !client.getZookeeperClient().getRetryPolicy().allowRetry(e) ) { throw e; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 1abfc28..e704f02 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -637,7 +637,8 @@ public class CuratorFrameworkImpl implements CuratorFramework boolean doQueueOperation = false; do { - if ( RetryLoop.shouldRetry(event.getResultCode()) ) + final KeeperException ke = KeeperException.create(event.getResultCode()); + if ( getZookeeperClient().getRetryPolicy().allowRetry(ke) ) { doQueueOperation = checkBackgroundRetry(operationAndData, event); break; @@ -901,7 +902,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { do { - if ( (operationAndData != null) && RetryLoop.isRetryException(e) ) + if ( (operationAndData != null) && getZookeeperClient().getRetryPolicy().allowRetry(e) ) { if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java index 4deaf70..3cc54cd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java @@ -300,7 +300,7 @@ public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<Str { ThreadUtils.checkInterrupted(e); //Only retry a guaranteed delete if it's a retryable error - if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed ) + if ( (client.getZookeeperClient().getRetryPolicy().allowRetry(e) || (e instanceof InterruptedException)) && guaranteed ) { client.getFailedDeleteManager().addFailedOperation(unfixedPath); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index 961d5f0..3c2c35d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -270,13 +270,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat } catch(Exception e) { - if( RetryLoop.isRetryException(e) && guaranteed ) + if ( client.getZookeeperClient().getRetryPolicy().allowRetry(e) && guaranteed ) { //Setup the guaranteed handler client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, finalNamespaceWatcher)); throw e; } - else if(e instanceof KeeperException.NoWatcherException && quietly) + else if (e instanceof KeeperException.NoWatcherException && quietly) { // ignore } @@ -349,4 +349,4 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat backgrounding.checkError(e, null); } } -} \ No newline at end of file +}
