This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-505 in repository https://gitbox.apache.org/repos/asf/curator.git
commit b18af513b4851c8ecb9486a957971d7dfa179ceb Author: randgalt <[email protected]> AuthorDate: Mon Feb 11 07:46:55 2019 -0500 CURATOR-505 - Some refactoring and more doc --- .../framework/listen/MappingListenerManager.java | 13 --- .../framework/listen/StandardListenerManager.java | 14 ++- .../framework/state/ConnectionStateManager.java | 3 +- .../cache/TestPathChildrenCacheInCluster.java | 106 ++++++++++++++++++++- .../org/apache/curator/test/BaseClassForTests.java | 5 + 5 files changed, 123 insertions(+), 18 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java index f230da9..bd9f51a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.UnaryOperator; /** * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that @@ -39,18 +38,6 @@ public class MappingListenerManager<K, V> implements ListenerManager<K, V> private final Function<K, V> mapper; /** - * Returns a new mapping container that maps to the same type - * - * @param mapper listener mapper/wrapper - * @return new container - */ - public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper) - { - MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper); - return new StandardListenerManager<>(container); - } - - /** * Returns a new container that wraps listeners using the given mapper * * @param mapper listener mapper/wrapper diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java index 8d239ca..e07fe47 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java @@ -18,10 +18,10 @@ */ package org.apache.curator.framework.listen; -import java.util.Objects; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.UnaryOperator; /** * Non mapping version of a listener container @@ -41,6 +41,18 @@ public class StandardListenerManager<T> implements ListenerManager<T, T> return new StandardListenerManager<>(container); } + /** + * Returns a new mapping container that maps to the same type + * + * @param mapper listener mapper/wrapper + * @return new container + */ + public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper) + { + MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper); + return new StandardListenerManager<>(container); + } + @Override public void addListener(T listener) { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 583b9f2..55e17c8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -22,7 +22,6 @@ package org.apache.curator.framework.state; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.listen.MappingListenerManager; import org.apache.curator.framework.listen.StandardListenerManager; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ThreadUtils; @@ -114,7 +113,7 @@ public class ConnectionStateManager implements Closeable threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); } service = Executors.newSingleThreadExecutor(threadFactory); - listeners = MappingListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener)); + listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener)); } /** diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java index cd87125..8dae57b 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java @@ -19,23 +19,125 @@ package org.apache.curator.framework.recipes.cache; import com.google.common.collect.Queues; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionStateListenerDecorator; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class TestPathChildrenCacheInCluster extends BaseClassForTests { + @Override + protected void createServer() + { + // do nothing + } + + @Test + public void testWithCircuitBreaker() throws Exception + { + Timing timing = new Timing(); + try ( TestingCluster cluster = new TestingCluster(3) ) + { + cluster.start(); + + ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds())); + Iterator<InstanceSpec> iterator = cluster.getInstances().iterator(); + InstanceSpec client1Instance = iterator.next(); + InstanceSpec client2Instance = iterator.next(); + ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(100, 3); + try (CuratorFramework client1 = CuratorFrameworkFactory. + builder() + .connectString(client1Instance.getConnectString()) + .retryPolicy(exponentialBackoffRetry) + .sessionTimeoutMs(timing.session()) + .connectionTimeoutMs(timing.connection()) + .connectionStateListenerDecorator(decorator) + .build() + ) + { + client1.start(); + + try ( CuratorFramework client2 = CuratorFrameworkFactory.newClient(client2Instance.getConnectString(), timing.session(), timing.connection(), exponentialBackoffRetry) ) + { + client2.start(); + + AtomicInteger refreshCount = new AtomicInteger(0); + try ( PathChildrenCache cache = new PathChildrenCache(client1, "/test", true) { + @Override + void refresh(RefreshMode mode) throws Exception + { + refreshCount.incrementAndGet(); + super.refresh(mode); + } + } ) + { + cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + + client2.create().forPath("/test/1", "one".getBytes()); + client2.create().forPath("/test/2", "two".getBytes()); + client2.create().forPath("/test/3", "three".getBytes()); + + Future<?> task = Executors.newSingleThreadExecutor().submit(() -> { + try + { + for ( int i = 0; i < 5; ++i ) + { + cluster.killServer(client1Instance); + cluster.restartServer(client1Instance); + timing.sleepABit(); + } + } + catch ( Exception e ) + { + e.printStackTrace(); + } + }); + + client2.create().forPath("/test/4", "four".getBytes()); + client2.create().forPath("/test/5", "five".getBytes()); + client2.delete().forPath("/test/4"); + client2.setData().forPath("/test/1", "1".getBytes()); + client2.create().forPath("/test/6", "six".getBytes()); + + task.get(); + timing.sleepABit(); + + Assert.assertNotNull(cache.getCurrentData("/test/1")); + Assert.assertEquals(cache.getCurrentData("/test/1").getData(), "1".getBytes()); + Assert.assertNotNull(cache.getCurrentData("/test/2")); + Assert.assertEquals(cache.getCurrentData("/test/2").getData(), "two".getBytes()); + Assert.assertNotNull(cache.getCurrentData("/test/3")); + Assert.assertEquals(cache.getCurrentData("/test/3").getData(), "three".getBytes()); + Assert.assertNull(cache.getCurrentData("/test/4")); + Assert.assertNotNull(cache.getCurrentData("/test/5")); + Assert.assertEquals(cache.getCurrentData("/test/5").getData(), "five".getBytes()); + Assert.assertNotNull(cache.getCurrentData("/test/6")); + Assert.assertEquals(cache.getCurrentData("/test/6").getData(), "six".getBytes()); + + Assert.assertEquals(refreshCount.get(), 2); + } + } + } + } + } + @Test(enabled = false) // this test is very flakey - it needs to be re-written at some point public void testMissedDelete() throws Exception { diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index 9ae6a5d..f932ae4 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -98,6 +98,11 @@ public class BaseClassForTests System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true"); System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true"); + createServer(); + } + + protected void createServer() throws Exception + { while ( server == null ) { try
