This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-559-fix-nested-retry-loops in repository https://gitbox.apache.org/repos/asf/curator.git
commit ad7e76afea8ca949b2c89c52ac928ecddaf54761 Author: randgalt <[email protected]> AuthorDate: Tue Feb 18 09:31:45 2020 -0500 CURATOR-559 The retry loop mechanism ended up getting nested multiple times causing exponential calls to the retry policy and violating a given policy's limits. Use a thread local to mitigate this so that a retry loop is reused for nested API calls, etc. --- .../org/apache/curator/CuratorZookeeperClient.java | 2 +- .../main/java/org/apache/curator/RetryLoop.java | 94 ++----------- .../java/org/apache/curator/RetryLoopImpl.java | 100 +++++++++++++ .../StandardConnectionHandlingPolicy.java | 28 ++-- .../curator/connection/ThreadLocalRetryLoop.java | 156 +++++++++++++++++++++ .../java/org/apache/curator/TestEnsurePath.java | 4 +- .../connection/TestThreadLocalRetryLoop.java | 118 ++++++++++++++++ 7 files changed, 410 insertions(+), 92 deletions(-) diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java index 7977541..167695f 100644 --- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java +++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java @@ -171,7 +171,7 @@ public class CuratorZookeeperClient implements Closeable */ public RetryLoop newRetryLoop() { - return new RetryLoop(retryPolicy.get(), tracer); + return new RetryLoopImpl(retryPolicy.get(), tracer); } /** diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java index 51df662..60b4a35 100644 --- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java +++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java @@ -18,15 +18,8 @@ */ package org.apache.curator; -import org.apache.curator.drivers.EventTrace; -import org.apache.curator.drivers.TracerDriver; -import org.apache.curator.utils.DebugUtils; import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; /** * <p>Mechanism to perform an operation on Zookeeper that is safe against @@ -56,34 +49,22 @@ import java.util.concurrent.atomic.AtomicReference; * } * } * </pre> + * + * <p> + * Note: this an {@code abstract class} instead of an {@code interface} for historical reasons. It was originally a class + * and if it becomes an interface we risk {@link java.lang.IncompatibleClassChangeError}s with clients. + * </p> */ -public class RetryLoop +public abstract class RetryLoop { - private boolean isDone = false; - private int retryCount = 0; - - private final Logger log = LoggerFactory.getLogger(getClass()); - private final long startTimeMs = System.currentTimeMillis(); - private final RetryPolicy retryPolicy; - private final AtomicReference<TracerDriver> tracer; - - private static final RetrySleeper sleeper = new RetrySleeper() - { - @Override - public void sleepFor(long time, TimeUnit unit) throws InterruptedException - { - unit.sleep(time); - } - }; - /** * Returns the default retry sleeper * * @return sleeper */ - public static RetrySleeper getDefaultRetrySleeper() + public static RetrySleeper getDefaultRetrySleeper() { - return sleeper; + return RetryLoopImpl.getRetrySleeper(); } /** @@ -95,34 +76,22 @@ public class RetryLoop * @return procedure result * @throws Exception any non-retriable errors */ - public static<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception + public static <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception { return client.getConnectionHandlingPolicy().callWithRetry(client, proc); } - RetryLoop(RetryPolicy retryPolicy, AtomicReference<TracerDriver> tracer) - { - this.retryPolicy = retryPolicy; - this.tracer = tracer; - } - /** * If true is returned, make an attempt at the operation * * @return true/false */ - public boolean shouldContinue() - { - return !isDone; - } + public abstract boolean shouldContinue(); /** * Call this when your operation has successfully completed */ - public void markComplete() - { - isDone = true; - } + public abstract void markComplete(); /** * Utility - return true if the given Zookeeper result code is retry-able @@ -130,7 +99,7 @@ public class RetryLoop * @param rc result code * @return true/false */ - public static boolean shouldRetry(int rc) + public static boolean shouldRetry(int rc) { return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) || (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) || @@ -145,11 +114,11 @@ public class RetryLoop * @param exception exception to check * @return true/false */ - public static boolean isRetryException(Throwable exception) + public static boolean isRetryException(Throwable exception) { if ( exception instanceof KeeperException ) { - KeeperException keeperException = (KeeperException)exception; + KeeperException keeperException = (KeeperException)exception; return shouldRetry(keeperException.code().intValue()); } return false; @@ -161,38 +130,5 @@ public class RetryLoop * @param exception the exception * @throws Exception if not retry-able or the retry policy returned negative */ - public void takeException(Exception exception) throws Exception - { - boolean rethrow = true; - if ( isRetryException(exception) ) - { - if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) - { - log.debug("Retry-able exception received", exception); - } - - if ( retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startTimeMs, sleeper) ) - { - new EventTrace("retries-allowed", tracer.get()).commit(); - if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) - { - log.debug("Retrying operation"); - } - rethrow = false; - } - else - { - new EventTrace("retries-disallowed", tracer.get()).commit(); - if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) - { - log.debug("Retry policy not allowing retry"); - } - } - } - - if ( rethrow ) - { - throw exception; - } - } + public abstract void takeException(Exception exception) throws Exception; } diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java new file mode 100644 index 0000000..bc1c244 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/RetryLoopImpl.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator; + +import org.apache.curator.drivers.EventTrace; +import org.apache.curator.drivers.TracerDriver; +import org.apache.curator.utils.DebugUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicReference; + +class RetryLoopImpl extends RetryLoop +{ + private boolean isDone = false; + private int retryCount = 0; + + private final Logger log = LoggerFactory.getLogger(getClass()); + private final long startTimeMs = System.currentTimeMillis(); + private final RetryPolicy retryPolicy; + private final AtomicReference<TracerDriver> tracer; + + private static final RetrySleeper sleeper = (time, unit) -> unit.sleep(time); + + RetryLoopImpl(RetryPolicy retryPolicy, AtomicReference<TracerDriver> tracer) + { + this.retryPolicy = retryPolicy; + this.tracer = tracer; + } + + static RetrySleeper getRetrySleeper() + { + return sleeper; + } + + + @Override + public boolean shouldContinue() + { + return !isDone; + } + + @Override + public void markComplete() + { + isDone = true; + } + + @Override + public void takeException(Exception exception) throws Exception + { + boolean rethrow = true; + if ( RetryLoop.isRetryException(exception) ) + { + if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) + { + log.debug("Retry-able exception received", exception); + } + + if ( retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startTimeMs, sleeper) ) + { + new EventTrace("retries-allowed", tracer.get()).commit(); + if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) + { + log.debug("Retrying operation"); + } + rethrow = false; + } + else + { + new EventTrace("retries-disallowed", tracer.get()).commit(); + if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) + { + log.debug("Retry policy not allowing retry"); + } + } + } + + if ( rethrow ) + { + throw exception; + } + } +} diff --git a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java index 41b342c..681fc84 100644 --- a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java +++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java @@ -56,20 +56,28 @@ public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolic client.internalBlockUntilConnectedOrTimedOut(); T result = null; - RetryLoop retryLoop = client.newRetryLoop(); - while ( retryLoop.shouldContinue() ) + ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop(); + RetryLoop retryLoop = threadLocalRetryLoop.getRetryLoop(client::newRetryLoop); + try { - try + while ( retryLoop.shouldContinue() ) { - result = proc.call(); - retryLoop.markComplete(); - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - retryLoop.takeException(e); + try + { + result = proc.call(); + retryLoop.markComplete(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + retryLoop.takeException(e); + } } } + finally + { + threadLocalRetryLoop.release(); + } return result; } diff --git a/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java new file mode 100644 index 0000000..225b967 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/connection/ThreadLocalRetryLoop.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.connection; + +import org.apache.curator.RetryLoop; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * <p> + * Retry loops can easily end up getting nested which can cause exponential calls of the retry policy + * (see https://issues.apache.org/jira/browse/CURATOR-559). This utility works around that by using + * an internal ThreadLocal to hold a retry loop. E.g. if the retry loop fails anywhere in the chain + * of nested calls it will fail for the rest of the nested calls instead. + * </p> + * + * <p> + * Example usage: + * + * <code><pre> + * ThreadLocalRetryLoop threadLocalRetryLoop = new ThreadLocalRetryLoop(); + * RetryLoop retryLoop = threadLocalRetryLoop.getRetryLoop(client::newRetryLoop); + * try + * { + * while ( retryLoop.shouldContinue() ) + * { + * try + * { + * // do work + * retryLoop.markComplete(); + * } + * catch ( Exception e ) + * { + * ThreadUtils.checkInterrupted(e); + * retryLoop.takeException(e); + * } + * } + * } + * finally + * { + * threadLocalRetryLoop.release(); + * } + * </pre></code> + * </p> + */ +public class ThreadLocalRetryLoop +{ + private static final Logger log = LoggerFactory.getLogger(ThreadLocalRetryLoop.class); + private static final ThreadLocal<Entry> threadLocal = new ThreadLocal<>(); + + private static class Entry + { + private final RetryLoop retryLoop; + private int counter; + + Entry(RetryLoop retryLoop) + { + this.retryLoop = retryLoop; + } + } + + private static class WrappedRetryLoop extends RetryLoop + { + private final RetryLoop retryLoop; + private Exception takenException; + + public WrappedRetryLoop(RetryLoop retryLoop) + { + this.retryLoop = retryLoop; + } + + @Override + public boolean shouldContinue() + { + return retryLoop.shouldContinue() && (takenException == null); + } + + @Override + public void markComplete() + { + retryLoop.markComplete(); + } + + @Override + public void takeException(Exception exception) throws Exception + { + if ( takenException != null ) + { + if ( exception.getClass() != takenException.getClass() ) + { + log.error("Multiple exceptions in retry loop", exception); + } + throw takenException; + } + try + { + retryLoop.takeException(exception); + } + catch ( Exception e ) + { + takenException = e; + throw e; + } + } + } + + /** + * Call to get the current retry loop. If there isn't one, one is allocated + * via {@code newRetryLoopSupplier}. + * + * @param newRetryLoopSupplier supply a new retry loop when needed. Normally you should use {@link org.apache.curator.CuratorZookeeperClient#newRetryLoop()} + * @return retry loop to use + */ + public RetryLoop getRetryLoop(Supplier<RetryLoop> newRetryLoopSupplier) + { + Entry entry = threadLocal.get(); + if ( entry == null ) + { + entry = new Entry(new WrappedRetryLoop(newRetryLoopSupplier.get())); + threadLocal.set(entry); + } + ++entry.counter; + return entry.retryLoop; + } + + /** + * Must be called to release the retry loop. See {@link org.apache.curator.connection.StandardConnectionHandlingPolicy#callWithRetry(org.apache.curator.CuratorZookeeperClient, java.util.concurrent.Callable)} + * for an example usage. + */ + public void release() + { + Entry entry = Objects.requireNonNull(threadLocal.get(), "No retry loop was set - unbalanced call to release()"); + if ( --entry.counter <= 0 ) + { + threadLocal.remove(); + } + } +} diff --git a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java index ba37d60..8eed2bf 100644 --- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java +++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java @@ -51,7 +51,7 @@ public class TestEnsurePath ZooKeeper client = mock(ZooKeeper.class, Mockito.RETURNS_MOCKS); CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class); RetryPolicy retryPolicy = new RetryOneTime(1); - RetryLoop retryLoop = new RetryLoop(retryPolicy, null); + RetryLoop retryLoop = new RetryLoopImpl(retryPolicy, null); when(curator.getConnectionHandlingPolicy()).thenReturn(new StandardConnectionHandlingPolicy()); when(curator.getZooKeeper()).thenReturn(client); when(curator.getRetryPolicy()).thenReturn(retryPolicy); @@ -76,7 +76,7 @@ public class TestEnsurePath { ZooKeeper client = mock(ZooKeeper.class, Mockito.RETURNS_MOCKS); RetryPolicy retryPolicy = new RetryOneTime(1); - RetryLoop retryLoop = new RetryLoop(retryPolicy, null); + RetryLoop retryLoop = new RetryLoopImpl(retryPolicy, null); final CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class); when(curator.getConnectionHandlingPolicy()).thenReturn(new StandardConnectionHandlingPolicy()); when(curator.getZooKeeper()).thenReturn(client); diff --git a/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java b/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java new file mode 100644 index 0000000..785c2c2 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/connection/TestThreadLocalRetryLoop.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.connection; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.RetrySleeper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.zookeeper.KeeperException; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestThreadLocalRetryLoop extends CuratorTestBase +{ + private static final int retryCount = 4; + + @Test(description = "Check for fix for CURATOR-559") + public void testRecursingRetry() throws Exception + { + AtomicInteger count = new AtomicInteger(); + try (CuratorFramework client = newClient(count)) + { + prep(client); + doLock(client); + Assert.assertEquals(count.get(), retryCount + 1); // Curator's retry policy has been off by 1 since inception - we might consider fixing it someday + } + } + + @Test(description = "Check for fix for CURATOR-559 with multiple threads") + public void testThreadedRecursingRetry() throws Exception + { + final int threadQty = 4; + ExecutorService executorService = Executors.newFixedThreadPool(threadQty); + AtomicInteger count = new AtomicInteger(); + try (CuratorFramework client = newClient(count)) + { + prep(client); + for ( int i = 0; i < threadQty; ++i ) + { + executorService.submit(() -> doLock(client)); + } + executorService.shutdown(); + executorService.awaitTermination(timing.milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertEquals(count.get(), threadQty * (retryCount + 1)); // Curator's retry policy has been off by 1 since inception - we might consider fixing it someday + } + } + + @Test(expectedExceptions = NullPointerException.class) + public void testBadReleaseWithNoGet() + { + ThreadLocalRetryLoop retryLoopStack = new ThreadLocalRetryLoop(); + retryLoopStack.release(); + } + + private CuratorFramework newClient(AtomicInteger count) + { + RetryPolicy retryPolicy = makeRetryPolicy(count); + return CuratorFrameworkFactory.newClient(server.getConnectString(), 100, 100, retryPolicy); + } + + private void prep(CuratorFramework client) throws Exception + { + client.start(); + client.create().forPath("/test"); + server.stop(); + } + + private Void doLock(CuratorFramework client) throws Exception + { + InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/test/lock"); + try + { + lock.readLock().acquire(); + Assert.fail("Should have thrown an exception"); + } + catch ( KeeperException ignore ) + { + // correct + } + return null; + } + + private RetryPolicy makeRetryPolicy(AtomicInteger count) + { + return new RetryNTimes(retryCount, 1) + { + @Override + public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) + { + count.incrementAndGet(); + return super.allowRetry(retryCount, elapsedTimeMs, sleeper); + } + }; + } +}
