This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-559-fix-nested-retry-loops
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 1d70a6afdabba4f0a0e75ba3e4b2be479b62dbbb
Author: randgalt <[email protected]>
AuthorDate: Tue Feb 18 09:31:45 2020 -0500

    CURATOR-559
    
    The retry loop mechanism ended up getting nested multiple times causing 
exponential calls to the retry policy and violating a given policy's limits. 
Use a thread local to mitigate this so that a retry loop is reused for nested 
API calls, etc.
---
 .../org/apache/curator/CuratorZookeeperClient.java |   2 +-
 .../main/java/org/apache/curator/RetryLoop.java    | 112 +++--------------
 .../java/org/apache/curator/RetryLoopImpl.java     | 100 +++++++++++++++
 .../StandardConnectionHandlingPolicy.java          |  28 +++--
 .../curator/connection/ThreadLocalRetryLoop.java   | 138 +++++++++++++++++++++
 .../java/org/apache/curator/TestEnsurePath.java    |   4 +-
 .../connection/TestThreadLocalRetryLoop.java       | 100 +++++++++++++++
 7 files changed, 374 insertions(+), 110 deletions(-)

diff --git 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index 7977541..167695f 100644
--- 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -171,7 +171,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public RetryLoop newRetryLoop()
     {
-        return new RetryLoop(retryPolicy.get(), tracer);
+        return new RetryLoopImpl(retryPolicy.get(), tracer);
     }
 
     /**
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java 
b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 51df662..8eeb1d1 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -1,32 +1,7 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.curator;
 
-import org.apache.curator.drivers.EventTrace;
-import org.apache.curator.drivers.TracerDriver;
-import org.apache.curator.utils.DebugUtils;
 import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>Mechanism to perform an operation on Zookeeper that is safe against
@@ -56,34 +31,22 @@ import java.util.concurrent.atomic.AtomicReference;
  *     }
  * }
  * </pre>
+ *
+ * <p>
+ *     Note: this an {@code abstract class} instead of an {@code interface} 
for historical reasons. It was originally a class
+ *     and if it becomes an interface we risk {@link 
java.lang.IncompatibleClassChangeError}s with clients.
+ * </p>
  */
-public class RetryLoop
+public abstract class RetryLoop
 {
-    private boolean         isDone = false;
-    private int             retryCount = 0;
-
-    private final Logger            log = LoggerFactory.getLogger(getClass());
-    private final long              startTimeMs = System.currentTimeMillis();
-    private final RetryPolicy       retryPolicy;
-    private final AtomicReference<TracerDriver>     tracer;
-
-    private static final RetrySleeper  sleeper = new RetrySleeper()
-    {
-        @Override
-        public void sleepFor(long time, TimeUnit unit) throws 
InterruptedException
-        {
-            unit.sleep(time);
-        }
-    };
-
     /**
      * Returns the default retry sleeper
      *
      * @return sleeper
      */
-    public static RetrySleeper      getDefaultRetrySleeper()
+    public static RetrySleeper getDefaultRetrySleeper()
     {
-        return sleeper;
+        return RetryLoopImpl.getRetrySleeper();
     }
 
     /**
@@ -95,34 +58,22 @@ public class RetryLoop
      * @return procedure result
      * @throws Exception any non-retriable errors
      */
-    public static<T> T      callWithRetry(CuratorZookeeperClient client, 
Callable<T> proc) throws Exception
+    public static <T> T callWithRetry(CuratorZookeeperClient client, 
Callable<T> proc) throws Exception
     {
         return client.getConnectionHandlingPolicy().callWithRetry(client, 
proc);
     }
 
-    RetryLoop(RetryPolicy retryPolicy, AtomicReference<TracerDriver> tracer)
-    {
-        this.retryPolicy = retryPolicy;
-        this.tracer = tracer;
-    }
-
     /**
      * If true is returned, make an attempt at the operation
      *
      * @return true/false
      */
-    public boolean      shouldContinue()
-    {
-        return !isDone;
-    }
+    public abstract boolean shouldContinue();
 
     /**
      * Call this when your operation has successfully completed
      */
-    public void     markComplete()
-    {
-        isDone = true;
-    }
+    public abstract void markComplete();
 
     /**
      * Utility - return true if the given Zookeeper result code is retry-able
@@ -130,7 +81,7 @@ public class RetryLoop
      * @param rc result code
      * @return true/false
      */
-    public static boolean      shouldRetry(int rc)
+    public static boolean shouldRetry(int rc)
     {
         return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
             (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
@@ -145,11 +96,11 @@ public class RetryLoop
      * @param exception exception to check
      * @return true/false
      */
-    public static boolean      isRetryException(Throwable exception)
+    public static boolean isRetryException(Throwable exception)
     {
         if ( exception instanceof KeeperException )
         {
-            KeeperException     keeperException = (KeeperException)exception;
+            KeeperException keeperException = (KeeperException)exception;
             return shouldRetry(keeperException.code().intValue());
         }
         return false;
@@ -161,38 +112,5 @@ public class RetryLoop
      * @param exception the exception
      * @throws Exception if not retry-able or the retry policy returned 
negative
      */
-    public void         takeException(Exception exception) throws Exception
-    {
-        boolean     rethrow = true;
-        if ( isRetryException(exception) )
-        {
-            if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
-            {
-                log.debug("Retry-able exception received", exception);
-            }
-
-            if ( retryPolicy.allowRetry(retryCount++, 
System.currentTimeMillis() - startTimeMs, sleeper) )
-            {
-                new EventTrace("retries-allowed", tracer.get()).commit();
-                if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
-                {
-                    log.debug("Retrying operation");
-                }
-                rethrow = false;
-            }
-            else
-            {
-                new EventTrace("retries-disallowed", tracer.get()).commit();
-                if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
-                {
-                    log.debug("Retry policy not allowing retry");
-                }
-            }
-        }
-
-        if ( rethrow )
-        {
-            throw exception;
-        }
-    }
+    public abstract void takeException(Exception exception) throws Exception;
 }
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java 
b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
new file mode 100644
index 0000000..bc1c244
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator;
+
+import org.apache.curator.drivers.EventTrace;
+import org.apache.curator.drivers.TracerDriver;
+import org.apache.curator.utils.DebugUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicReference;
+
+class RetryLoopImpl extends RetryLoop
+{
+    private boolean isDone = false;
+    private int retryCount = 0;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final long startTimeMs = System.currentTimeMillis();
+    private final RetryPolicy retryPolicy;
+    private final AtomicReference<TracerDriver> tracer;
+
+    private static final RetrySleeper sleeper = (time, unit) -> 
unit.sleep(time);
+
+    RetryLoopImpl(RetryPolicy retryPolicy, AtomicReference<TracerDriver> 
tracer)
+    {
+        this.retryPolicy = retryPolicy;
+        this.tracer = tracer;
+    }
+
+    static RetrySleeper getRetrySleeper()
+    {
+        return sleeper;
+    }
+
+
+    @Override
+    public boolean shouldContinue()
+    {
+        return !isDone;
+    }
+
+    @Override
+    public void markComplete()
+    {
+        isDone = true;
+    }
+
+    @Override
+    public void takeException(Exception exception) throws Exception
+    {
+        boolean rethrow = true;
+        if ( RetryLoop.isRetryException(exception) )
+        {
+            if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+            {
+                log.debug("Retry-able exception received", exception);
+            }
+
+            if ( retryPolicy.allowRetry(retryCount++, 
System.currentTimeMillis() - startTimeMs, sleeper) )
+            {
+                new EventTrace("retries-allowed", tracer.get()).commit();
+                if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+                {
+                    log.debug("Retrying operation");
+                }
+                rethrow = false;
+            }
+            else
+            {
+                new EventTrace("retries-disallowed", tracer.get()).commit();
+                if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+                {
+                    log.debug("Retry policy not allowing retry");
+                }
+            }
+        }
+
+        if ( rethrow )
+        {
+            throw exception;
+        }
+    }
+}
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index 41b342c..681fc84 100644
--- 
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -56,20 +56,28 @@ public class StandardConnectionHandlingPolicy implements 
ConnectionHandlingPolic
         client.internalBlockUntilConnectedOrTimedOut();
 
         T result = null;
-        RetryLoop retryLoop = client.newRetryLoop();
-        while ( retryLoop.shouldContinue() )
+        ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop();
+        RetryLoop retryLoop = 
threadLocalRetryLoop.getRetryLoop(client::newRetryLoop);
+        try
         {
-            try
+            while ( retryLoop.shouldContinue() )
             {
-                result = proc.call();
-                retryLoop.markComplete();
-            }
-            catch ( Exception e )
-            {
-                ThreadUtils.checkInterrupted(e);
-                retryLoop.takeException(e);
+                try
+                {
+                    result = proc.call();
+                    retryLoop.markComplete();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    retryLoop.takeException(e);
+                }
             }
         }
+        finally
+        {
+            threadLocalRetryLoop.release();
+        }
 
         return result;
     }
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
 
b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
new file mode 100644
index 0000000..6fe1da0
--- /dev/null
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java
@@ -0,0 +1,138 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.RetryLoop;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * <p>
+ *     Retry loops can easily end up getting nested which can cause 
exponential calls of the retry policy
+ *     (see https://issues.apache.org/jira/browse/CURATOR-559). This utility 
works around that by using
+ *     an internal ThreadLocal to hold a retry loop. E.g. if the retry loop 
fails anywhere in the chain
+ *     of nested calls it will fail for the rest of the nested calls instead.
+ * </p>
+ *
+ * <p>
+ *     Example usage:
+ *
+ * <code><pre>
+ * ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop();
+ * RetryLoop retryLoop = 
threadLocalRetryLoop.getRetryLoop(client::newRetryLoop);
+ * try
+ * {
+ *     while ( retryLoop.shouldContinue() )
+ *     {
+ *         try
+ *         {
+ *             // do work
+ *             retryLoop.markComplete();
+ *         }
+ *         catch ( Exception e )
+ *         {
+ *             ThreadUtils.checkInterrupted(e);
+ *             retryLoop.takeException(e);
+ *         }
+ *     }
+ * }
+ * finally
+ * {
+ *     threadLocalRetryLoop.release();
+ * }
+ * </pre></code>
+ * </p>
+ */
+public class ThreadLocalRetryLoop
+{
+    private static final Logger log = 
LoggerFactory.getLogger(ThreadLocalRetryLoop.class);
+    private static final ThreadLocal<Entry> threadLocal = new ThreadLocal<>();
+
+    private static class Entry
+    {
+        private final RetryLoop retryLoop;
+        private int counter;
+
+        Entry(RetryLoop retryLoop)
+        {
+            this.retryLoop = retryLoop;
+        }
+    }
+
+    private static class WrappedRetryLoop extends RetryLoop
+    {
+        private final RetryLoop retryLoop;
+        private Exception takenException;
+
+        public WrappedRetryLoop(RetryLoop retryLoop)
+        {
+            this.retryLoop = retryLoop;
+        }
+
+        @Override
+        public boolean shouldContinue()
+        {
+            return retryLoop.shouldContinue() && (takenException == null);
+        }
+
+        @Override
+        public void markComplete()
+        {
+            retryLoop.markComplete();
+        }
+
+        @Override
+        public void takeException(Exception exception) throws Exception
+        {
+            if ( takenException != null )
+            {
+                if ( exception.getClass() != takenException.getClass() )
+                {
+                    log.error("Multiple exceptions in retry loop", exception);
+                }
+                throw takenException;
+            }
+            try
+            {
+                retryLoop.takeException(exception);
+            }
+            catch ( Exception e )
+            {
+                takenException = e;
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Call to get the current retry loop. If there isn't one, one is allocated
+     * via {@code newRetryLoopSupplier}.
+     *
+     * @param newRetryLoopSupplier supply a new retry loop when needed. 
Normally you should use {@link 
org.apache.curator.CuratorZookeeperClient#newRetryLoop()}
+     * @return retry loop to use
+     */
+    public RetryLoop getRetryLoop(Supplier<RetryLoop> newRetryLoopSupplier)
+    {
+        Entry entry = threadLocal.get();
+        if ( entry == null )
+        {
+            entry = new Entry(new 
WrappedRetryLoop(newRetryLoopSupplier.get()));
+            threadLocal.set(entry);
+        }
+        ++entry.counter;
+        return entry.retryLoop;
+    }
+
+    /**
+     * Must be called to release the retry loop. See {@link 
org.apache.curator.connection.StandardConnectionHandlingPolicy#callWithRetry(org.apache.curator.CuratorZookeeperClient,
 java.util.concurrent.Callable)}
+     * for an example usage.
+     */
+    public void release()
+    {
+        Entry entry = Objects.requireNonNull(threadLocal.get(), "No retry loop 
was set - unbalanced call to release()");
+        if ( --entry.counter <= 0 )
+        {
+            threadLocal.remove();
+        }
+    }
+}
diff --git 
a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java 
b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
index ba37d60..8eed2bf 100644
--- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
+++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
@@ -51,7 +51,7 @@ public class TestEnsurePath
         ZooKeeper               client = mock(ZooKeeper.class, 
Mockito.RETURNS_MOCKS);
         CuratorZookeeperClient  curator = mock(CuratorZookeeperClient.class);
         RetryPolicy             retryPolicy = new RetryOneTime(1);
-        RetryLoop               retryLoop = new RetryLoop(retryPolicy, null);
+        RetryLoop               retryLoop = new RetryLoopImpl(retryPolicy, 
null);
         when(curator.getConnectionHandlingPolicy()).thenReturn(new 
StandardConnectionHandlingPolicy());
         when(curator.getZooKeeper()).thenReturn(client);
         when(curator.getRetryPolicy()).thenReturn(retryPolicy);
@@ -76,7 +76,7 @@ public class TestEnsurePath
     {
         ZooKeeper               client = mock(ZooKeeper.class, 
Mockito.RETURNS_MOCKS);
         RetryPolicy             retryPolicy = new RetryOneTime(1);
-        RetryLoop               retryLoop = new RetryLoop(retryPolicy, null);
+        RetryLoop               retryLoop = new RetryLoopImpl(retryPolicy, 
null);
         final CuratorZookeeperClient  curator = 
mock(CuratorZookeeperClient.class);
         when(curator.getConnectionHandlingPolicy()).thenReturn(new 
StandardConnectionHandlingPolicy());
         when(curator.getZooKeeper()).thenReturn(client);
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java
 
b/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java
new file mode 100644
index 0000000..1b34032
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java
@@ -0,0 +1,100 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.RetrySleeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestThreadLocalRetryLoop extends CuratorTestBase
+{
+    private static final int retryCount = 4;
+
+    @Test(description = "Check for fix for CURATOR-559")
+    public void testRecursingRetry() throws Exception
+    {
+        AtomicInteger count = new AtomicInteger();
+        try (CuratorFramework client = newClient(count))
+        {
+            prep(client);
+            doLock(client);
+            Assert.assertEquals(count.get(), retryCount + 1);    // Curator's 
retry policy has been off by 1 since inception - we might consider fixing it 
someday
+        }
+    }
+
+    @Test(description = "Check for fix for CURATOR-559 with multiple threads")
+    public void testThreadedRecursingRetry() throws Exception
+    {
+        final int threadQty = 4;
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadQty);
+        AtomicInteger count = new AtomicInteger();
+        try (CuratorFramework client = newClient(count))
+        {
+            prep(client);
+            for ( int i = 0; i < threadQty; ++i )
+            {
+                executorService.submit(() -> doLock(client));
+            }
+            executorService.shutdown();
+            executorService.awaitTermination(timing.milliseconds(), 
TimeUnit.MILLISECONDS);
+            Assert.assertEquals(count.get(), threadQty * (retryCount + 1));    
// Curator's retry policy has been off by 1 since inception - we might consider 
fixing it someday
+        }
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testBadReleaseWithNoGet()
+    {
+        ThreadLocalRetryLoop retryLoopStack = new ThreadLocalRetryLoop();
+        retryLoopStack.release();
+    }
+
+    private CuratorFramework newClient(AtomicInteger count)
+    {
+        RetryPolicy retryPolicy = makeRetryPolicy(count);
+        return CuratorFrameworkFactory.newClient(server.getConnectString(), 
100, 100, retryPolicy);
+    }
+
+    private void prep(CuratorFramework client) throws Exception
+    {
+        client.start();
+        client.create().forPath("/test");
+        server.stop();
+    }
+
+    private Void doLock(CuratorFramework client) throws Exception
+    {
+        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, 
"/test/lock");
+        try
+        {
+            lock.readLock().acquire();
+            Assert.fail("Should have thrown an exception");
+        }
+        catch ( KeeperException ignore )
+        {
+            // correct
+        }
+        return null;
+    }
+
+    private RetryPolicy makeRetryPolicy(AtomicInteger count)
+    {
+        return new RetryNTimes(retryCount, 1)
+        {
+            @Override
+            public boolean allowRetry(int retryCount, long elapsedTimeMs, 
RetrySleeper sleeper)
+            {
+                count.incrementAndGet();
+                return super.allowRetry(retryCount, elapsedTimeMs, sleeper);
+            }
+        };
+    }
+}

Reply via email to