This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 959c1ca CURATOR-559 (#346)
959c1ca is described below
commit 959c1ca34f6ebcb11370180dfafbd8e85320dcd2
Author: Jordan Zimmerman <[email protected]>
AuthorDate: Wed Feb 19 22:27:10 2020 -0500
CURATOR-559 (#346)
The retry loop mechanism ended up getting nested multiple times causing
exponential calls to the retry policy and violating a given policy's limits.
Use a thread local to mitigate this so that a retry loop is reused for nested
API calls, etc.
---
.../org/apache/curator/CuratorZookeeperClient.java | 2 +-
.../main/java/org/apache/curator/RetryLoop.java | 94 ++-----------
.../java/org/apache/curator/RetryLoopImpl.java | 100 +++++++++++++
.../StandardConnectionHandlingPolicy.java | 28 ++--
.../curator/connection/ThreadLocalRetryLoop.java | 156 +++++++++++++++++++++
.../java/org/apache/curator/TestEnsurePath.java | 4 +-
.../connection/TestThreadLocalRetryLoop.java | 118 ++++++++++++++++
7 files changed, 410 insertions(+), 92 deletions(-)
diff --git
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 7977541..167695f 100644
---
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -171,7 +171,7 @@ public class CuratorZookeeperClient implements Closeable
*/
public RetryLoop newRetryLoop()
{
- return new RetryLoop(retryPolicy.get(), tracer);
+ return new RetryLoopImpl(retryPolicy.get(), tracer);
}
/**
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 51df662..60b4a35 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -18,15 +18,8 @@
*/
package org.apache.curator;
-import org.apache.curator.drivers.EventTrace;
-import org.apache.curator.drivers.TracerDriver;
-import org.apache.curator.utils.DebugUtils;
import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
/**
* <p>Mechanism to perform an operation on Zookeeper that is safe against
@@ -56,34 +49,22 @@ import java.util.concurrent.atomic.AtomicReference;
* }
* }
* </pre>
+ *
+ * <p>
+ * Note: this an {@code abstract class} instead of an {@code interface}
for historical reasons. It was originally a class
+ * and if it becomes an interface we risk {@link
java.lang.IncompatibleClassChangeError}s with clients.
+ * </p>
*/
-public class RetryLoop
+public abstract class RetryLoop
{
- private boolean isDone = false;
- private int retryCount = 0;
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final long startTimeMs = System.currentTimeMillis();
- private final RetryPolicy retryPolicy;
- private final AtomicReference<TracerDriver> tracer;
-
- private static final RetrySleeper sleeper = new RetrySleeper()
- {
- @Override
- public void sleepFor(long time, TimeUnit unit) throws
InterruptedException
- {
- unit.sleep(time);
- }
- };
-
/**
* Returns the default retry sleeper
*
* @return sleeper
*/
- public static RetrySleeper getDefaultRetrySleeper()
+ public static RetrySleeper getDefaultRetrySleeper()
{
- return sleeper;
+ return RetryLoopImpl.getRetrySleeper();
}
/**
@@ -95,34 +76,22 @@ public class RetryLoop
* @return procedure result
* @throws Exception any non-retriable errors
*/
- public static<T> T callWithRetry(CuratorZookeeperClient client,
Callable<T> proc) throws Exception
+ public static <T> T callWithRetry(CuratorZookeeperClient client,
Callable<T> proc) throws Exception
{
return client.getConnectionHandlingPolicy().callWithRetry(client,
proc);
}
- RetryLoop(RetryPolicy retryPolicy, AtomicReference<TracerDriver> tracer)
- {
- this.retryPolicy = retryPolicy;
- this.tracer = tracer;
- }
-
/**
* If true is returned, make an attempt at the operation
*
* @return true/false
*/
- public boolean shouldContinue()
- {
- return !isDone;
- }
+ public abstract boolean shouldContinue();
/**
* Call this when your operation has successfully completed
*/
- public void markComplete()
- {
- isDone = true;
- }
+ public abstract void markComplete();
/**
* Utility - return true if the given Zookeeper result code is retry-able
@@ -130,7 +99,7 @@ public class RetryLoop
* @param rc result code
* @return true/false
*/
- public static boolean shouldRetry(int rc)
+ public static boolean shouldRetry(int rc)
{
return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
@@ -145,11 +114,11 @@ public class RetryLoop
* @param exception exception to check
* @return true/false
*/
- public static boolean isRetryException(Throwable exception)
+ public static boolean isRetryException(Throwable exception)
{
if ( exception instanceof KeeperException )
{
- KeeperException keeperException = (KeeperException)exception;
+ KeeperException keeperException = (KeeperException)exception;
return shouldRetry(keeperException.code().intValue());
}
return false;
@@ -161,38 +130,5 @@ public class RetryLoop
* @param exception the exception
* @throws Exception if not retry-able or the retry policy returned
negative
*/
- public void takeException(Exception exception) throws Exception
- {
- boolean rethrow = true;
- if ( isRetryException(exception) )
- {
- if (
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
- {
- log.debug("Retry-able exception received", exception);
- }
-
- if ( retryPolicy.allowRetry(retryCount++,
System.currentTimeMillis() - startTimeMs, sleeper) )
- {
- new EventTrace("retries-allowed", tracer.get()).commit();
- if (
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
- {
- log.debug("Retrying operation");
- }
- rethrow = false;
- }
- else
- {
- new EventTrace("retries-disallowed", tracer.get()).commit();
- if (
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
- {
- log.debug("Retry policy not allowing retry");
- }
- }
- }
-
- if ( rethrow )
- {
- throw exception;
- }
- }
+ public abstract void takeException(Exception exception) throws Exception;
}
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
new file mode 100644
index 0000000..bc1c244
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator;
+
+import org.apache.curator.drivers.EventTrace;
+import org.apache.curator.drivers.TracerDriver;
+import org.apache.curator.utils.DebugUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicReference;
+
+class RetryLoopImpl extends RetryLoop
+{
+ private boolean isDone = false;
+ private int retryCount = 0;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final long startTimeMs = System.currentTimeMillis();
+ private final RetryPolicy retryPolicy;
+ private final AtomicReference<TracerDriver> tracer;
+
+ private static final RetrySleeper sleeper = (time, unit) ->
unit.sleep(time);
+
+ RetryLoopImpl(RetryPolicy retryPolicy, AtomicReference<TracerDriver>
tracer)
+ {
+ this.retryPolicy = retryPolicy;
+ this.tracer = tracer;
+ }
+
+ static RetrySleeper getRetrySleeper()
+ {
+ return sleeper;
+ }
+
+
+ @Override
+ public boolean shouldContinue()
+ {
+ return !isDone;
+ }
+
+ @Override
+ public void markComplete()
+ {
+ isDone = true;
+ }
+
+ @Override
+ public void takeException(Exception exception) throws Exception
+ {
+ boolean rethrow = true;
+ if ( RetryLoop.isRetryException(exception) )
+ {
+ if (
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+ {
+ log.debug("Retry-able exception received", exception);
+ }
+
+ if ( retryPolicy.allowRetry(retryCount++,
System.currentTimeMillis() - startTimeMs, sleeper) )
+ {
+ new EventTrace("retries-allowed", tracer.get()).commit();
+ if (
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+ {
+ log.debug("Retrying operation");
+ }
+ rethrow = false;
+ }
+ else
+ {
+ new EventTrace("retries-disallowed", tracer.get()).commit();
+ if (
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+ {
+ log.debug("Retry policy not allowing retry");
+ }
+ }
+ }
+
+ if ( rethrow )
+ {
+ throw exception;
+ }
+ }
+}
diff --git
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index 41b342c..681fc84 100644
---
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -56,20 +56,28 @@ public class StandardConnectionHandlingPolicy implements
ConnectionHandlingPolic
client.internalBlockUntilConnectedOrTimedOut();
T result = null;
- RetryLoop retryLoop = client.newRetryLoop();
- while ( retryLoop.shouldContinue() )
+ ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop();
+ RetryLoop retryLoop =
threadLocalRetryLoop.getRetryLoop(client::newRetryLoop);
+ try
{
- try
+ while ( retryLoop.shouldContinue() )
{
- result = proc.call();
- retryLoop.markComplete();
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- retryLoop.takeException(e);
+ try
+ {
+ result = proc.call();
+ retryLoop.markComplete();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ retryLoop.takeException(e);
+ }
}
}
+ finally
+ {
+ threadLocalRetryLoop.release();
+ }
return result;
}
diff --git
a/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
new file mode 100644
index 0000000..225b967
--- /dev/null
+++
b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.connection;
+
+import org.apache.curator.RetryLoop;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * <p>
+ * Retry loops can easily end up getting nested which can cause
exponential calls of the retry policy
+ * (see https://issues.apache.org/jira/browse/CURATOR-559). This utility
works around that by using
+ * an internal ThreadLocal to hold a retry loop. E.g. if the retry loop
fails anywhere in the chain
+ * of nested calls it will fail for the rest of the nested calls instead.
+ * </p>
+ *
+ * <p>
+ * Example usage:
+ *
+ * <code><pre>
+ * ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop();
+ * RetryLoop retryLoop =
threadLocalRetryLoop.getRetryLoop(client::newRetryLoop);
+ * try
+ * {
+ * while ( retryLoop.shouldContinue() )
+ * {
+ * try
+ * {
+ * // do work
+ * retryLoop.markComplete();
+ * }
+ * catch ( Exception e )
+ * {
+ * ThreadUtils.checkInterrupted(e);
+ * retryLoop.takeException(e);
+ * }
+ * }
+ * }
+ * finally
+ * {
+ * threadLocalRetryLoop.release();
+ * }
+ * </pre></code>
+ * </p>
+ */
+public class ThreadLocalRetryLoop
+{
+ private static final Logger log =
LoggerFactory.getLogger(ThreadLocalRetryLoop.class);
+ private static final ThreadLocal<Entry> threadLocal = new ThreadLocal<>();
+
+ private static class Entry
+ {
+ private final RetryLoop retryLoop;
+ private int counter;
+
+ Entry(RetryLoop retryLoop)
+ {
+ this.retryLoop = retryLoop;
+ }
+ }
+
+ private static class WrappedRetryLoop extends RetryLoop
+ {
+ private final RetryLoop retryLoop;
+ private Exception takenException;
+
+ public WrappedRetryLoop(RetryLoop retryLoop)
+ {
+ this.retryLoop = retryLoop;
+ }
+
+ @Override
+ public boolean shouldContinue()
+ {
+ return retryLoop.shouldContinue() && (takenException == null);
+ }
+
+ @Override
+ public void markComplete()
+ {
+ retryLoop.markComplete();
+ }
+
+ @Override
+ public void takeException(Exception exception) throws Exception
+ {
+ if ( takenException != null )
+ {
+ if ( exception.getClass() != takenException.getClass() )
+ {
+ log.error("Multiple exceptions in retry loop", exception);
+ }
+ throw takenException;
+ }
+ try
+ {
+ retryLoop.takeException(exception);
+ }
+ catch ( Exception e )
+ {
+ takenException = e;
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Call to get the current retry loop. If there isn't one, one is allocated
+ * via {@code newRetryLoopSupplier}.
+ *
+ * @param newRetryLoopSupplier supply a new retry loop when needed.
Normally you should use {@link
org.apache.curator.CuratorZookeeperClient#newRetryLoop()}
+ * @return retry loop to use
+ */
+ public RetryLoop getRetryLoop(Supplier<RetryLoop> newRetryLoopSupplier)
+ {
+ Entry entry = threadLocal.get();
+ if ( entry == null )
+ {
+ entry = new Entry(new
WrappedRetryLoop(newRetryLoopSupplier.get()));
+ threadLocal.set(entry);
+ }
+ ++entry.counter;
+ return entry.retryLoop;
+ }
+
+ /**
+ * Must be called to release the retry loop. See {@link
org.apache.curator.connection.StandardConnectionHandlingPolicy#callWithRetry(org.apache.curator.CuratorZookeeperClient,
java.util.concurrent.Callable)}
+ * for an example usage.
+ */
+ public void release()
+ {
+ Entry entry = Objects.requireNonNull(threadLocal.get(), "No retry loop
was set - unbalanced call to release()");
+ if ( --entry.counter <= 0 )
+ {
+ threadLocal.remove();
+ }
+ }
+}
diff --git
a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
index ba37d60..8eed2bf 100644
--- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
+++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
@@ -51,7 +51,7 @@ public class TestEnsurePath
ZooKeeper client = mock(ZooKeeper.class,
Mockito.RETURNS_MOCKS);
CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
RetryPolicy retryPolicy = new RetryOneTime(1);
- RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
+ RetryLoop retryLoop = new RetryLoopImpl(retryPolicy,
null);
when(curator.getConnectionHandlingPolicy()).thenReturn(new
StandardConnectionHandlingPolicy());
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
@@ -76,7 +76,7 @@ public class TestEnsurePath
{
ZooKeeper client = mock(ZooKeeper.class,
Mockito.RETURNS_MOCKS);
RetryPolicy retryPolicy = new RetryOneTime(1);
- RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
+ RetryLoop retryLoop = new RetryLoopImpl(retryPolicy,
null);
final CuratorZookeeperClient curator =
mock(CuratorZookeeperClient.class);
when(curator.getConnectionHandlingPolicy()).thenReturn(new
StandardConnectionHandlingPolicy());
when(curator.getZooKeeper()).thenReturn(client);
diff --git
a/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java
b/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java
new file mode 100644
index 0000000..785c2c2
--- /dev/null
+++
b/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.connection;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.RetrySleeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestThreadLocalRetryLoop extends CuratorTestBase
+{
+ private static final int retryCount = 4;
+
+ @Test(description = "Check for fix for CURATOR-559")
+ public void testRecursingRetry() throws Exception
+ {
+ AtomicInteger count = new AtomicInteger();
+ try (CuratorFramework client = newClient(count))
+ {
+ prep(client);
+ doLock(client);
+ Assert.assertEquals(count.get(), retryCount + 1); // Curator's
retry policy has been off by 1 since inception - we might consider fixing it
someday
+ }
+ }
+
+ @Test(description = "Check for fix for CURATOR-559 with multiple threads")
+ public void testThreadedRecursingRetry() throws Exception
+ {
+ final int threadQty = 4;
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadQty);
+ AtomicInteger count = new AtomicInteger();
+ try (CuratorFramework client = newClient(count))
+ {
+ prep(client);
+ for ( int i = 0; i < threadQty; ++i )
+ {
+ executorService.submit(() -> doLock(client));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(timing.milliseconds(),
TimeUnit.MILLISECONDS);
+ Assert.assertEquals(count.get(), threadQty * (retryCount + 1));
// Curator's retry policy has been off by 1 since inception - we might consider
fixing it someday
+ }
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testBadReleaseWithNoGet()
+ {
+ ThreadLocalRetryLoop retryLoopStack = new ThreadLocalRetryLoop();
+ retryLoopStack.release();
+ }
+
+ private CuratorFramework newClient(AtomicInteger count)
+ {
+ RetryPolicy retryPolicy = makeRetryPolicy(count);
+ return CuratorFrameworkFactory.newClient(server.getConnectString(),
100, 100, retryPolicy);
+ }
+
+ private void prep(CuratorFramework client) throws Exception
+ {
+ client.start();
+ client.create().forPath("/test");
+ server.stop();
+ }
+
+ private Void doLock(CuratorFramework client) throws Exception
+ {
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client,
"/test/lock");
+ try
+ {
+ lock.readLock().acquire();
+ Assert.fail("Should have thrown an exception");
+ }
+ catch ( KeeperException ignore )
+ {
+ // correct
+ }
+ return null;
+ }
+
+ private RetryPolicy makeRetryPolicy(AtomicInteger count)
+ {
+ return new RetryNTimes(retryCount, 1)
+ {
+ @Override
+ public boolean allowRetry(int retryCount, long elapsedTimeMs,
RetrySleeper sleeper)
+ {
+ count.incrementAndGet();
+ return super.allowRetry(retryCount, elapsedTimeMs, sleeper);
+ }
+ };
+ }
+}