This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch curator-v2 in repository https://gitbox.apache.org/repos/asf/curator.git
commit 059075caedb061f7fc03a8cebec0a19ee038b668 Author: randgalt <[email protected]> AuthorDate: Sat Nov 2 11:40:44 2019 -0500 wip - new module that will be used to support features in ZK 3.6+ while maintaining background compatability with previous versions of ZK in the other modules. At some point in the future, this can be merged into the main modules and then removed --- .../org/apache/curator/utils/Compatibility.java | 11 + .../curator/utils/DefaultZookeeperFactory.java | 17 ++ .../curator/framework/api/CuratorEventType.java | 7 +- .../framework/imps/CuratorFrameworkImpl.java | 9 + .../framework/imps/ReconfigBuilderImpl.java | 5 +- .../framework/imps/RemoveWatchesBuilderImpl.java | 9 +- .../apache/curator/framework/imps/Watching.java | 10 +- .../curator/framework/imps/TestFramework.java | 5 +- .../curator/framework/imps/TestFrameworkEdges.java | 2 + .../framework/imps/TestReconfiguration.java | 2 +- .../curator/framework/imps/TestWithCluster.java | 2 + .../state/TestConnectionStateManager.java | 2 + .../framework/recipes/cache/BaseTestTreeCache.java | 4 +- .../framework/recipes/cache/TestNodeCache.java | 2 + .../recipes/cache/TestPathChildrenCache.java | 2 + .../cache/TestPathChildrenCacheInCluster.java | 2 + .../framework/recipes/cache/TestTreeCache.java | 2 + .../recipes/leader/ChaosMonkeyCnxnFactory.java | 8 +- .../framework/recipes/leader/TestLeaderLatch.java | 2 + .../locks/TestInterProcessSemaphoreCluster.java | 2 + curator-test/pom.xml | 10 + .../org/apache/curator/test/BaseClassForTests.java | 4 +- .../org/apache/curator/test/Compatibility.java | 93 +++++++ .../org/apache/curator/test/TestingCluster.java | 2 +- .../apache/curator/test/TestingQuorumPeerMain.java | 2 +- .../apache/curator/test/TestingZooKeeperMain.java | 6 +- .../test/compatibility/Zk35MethodInterceptor.java | 1 + curator-v2/pom.xml | 146 +++++++++++ .../curator/framework/api/AddWatchBuilder.java | 19 +- .../curator/framework/api/AddWatchBuilder2.java | 14 +- .../apache/curator/framework/api/AddWatchable.java | 28 +- .../curator/framework/api/WatchesBuilder.java | 20 +- .../framework/imps/AddWatchBuilderImpl.java | 180 +++++++++++++ .../framework/imps/CuratorFrameworkV2Impl.java | 290 +++++++++++++++++++++ .../imps/WatcherRemoveCuratorFrameworkV2Impl.java | 284 ++++++++++++++++++++ .../curator/framework/imps/WatchesBuilderImpl.java | 26 +- .../apache/curator/v2/AsyncCuratorFrameworkV2.java | 69 +++++ .../org/apache/curator/v2/CuratorFrameworkV2.java | 33 ++- .../v2/WatcherRemoveCuratorFrameworkV2.java | 10 +- .../x/async/api/AsyncCuratorFrameworkDslV2.java | 14 +- .../curator/x/async/api/AsyncWatchBuilder.java | 24 +- .../async/details/AsyncCuratorFrameworkV2Impl.java | 189 ++++++++++++++ .../x/async/details/AsyncWatchBuilderImpl.java | 77 ++++++ .../curator/v2/TestAsyncCuratorFrameworkV2.java | 57 ++++ .../org/apache/curator/v2/TestFrameworkV2.java | 85 ++++++ curator-v2/src/test/resources/log4j.properties | 27 ++ .../x/async/details/AsyncCuratorFrameworkImpl.java | 10 + pom.xml | 45 +++- src/site/confluence/zk-compatibility.confluence | 41 ++- 49 files changed, 1804 insertions(+), 107 deletions(-) diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java index 1ee2301..f9810fe 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java +++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java @@ -32,6 +32,7 @@ public class Compatibility { private static final boolean hasZooKeeperAdmin; private static final Method queueEventMethod; + private static final Logger logger = LoggerFactory.getLogger(Compatibility.class); static @@ -74,6 +75,16 @@ public class Compatibility } /** + * Return true if the ZooKeeperAdmin class is available + * + * @return true/false + */ + public static boolean hasZooKeeperAdmin() + { + return hasZooKeeperAdmin; + } + + /** * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>. * For ZooKeeper 3.4.x do the equivalent via reflection * diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java index 42279d0..f936518 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java @@ -20,12 +20,29 @@ package org.apache.curator.utils; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.admin.ZooKeeperAdmin; public class DefaultZookeeperFactory implements ZookeeperFactory { + // hide org.apache.zookeeper.admin.ZooKeeperAdmin in a nested class so that Curator continues to work with ZK 3.4.x + private static class ZooKeeperAdminMaker implements ZookeeperFactory + { + static final ZooKeeperAdminMaker instance = new ZooKeeperAdminMaker(); + + @Override + public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception + { + return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly); + } + } + @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception { + if ( Compatibility.hasZooKeeperAdmin() ) + { + return ZooKeeperAdminMaker.instance.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); + } return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java index 5dea211..d19f49b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java @@ -96,5 +96,10 @@ public enum CuratorEventType /** * Event sent when client is being closed */ - CLOSING + CLOSING, + + /** + * Corresponds to {@code CuratorFrameworkV2.watches().add()} + */ + ADD_WATCH } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index e003bf0..bc59e77 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -620,6 +620,15 @@ public class CuratorFrameworkImpl implements CuratorFramework return client.getZooKeeper(); } + Object getZooKeeperAdmin() throws Exception + { + if ( isZk34CompatibilityMode() ) + { + Preconditions.checkState(!isZk34CompatibilityMode(), "getZooKeeperAdmin() is not supported when running in ZooKeeper 3.4 compatibility mode"); + } + return client.getZooKeeper(); + } + CompressionProvider getCompressionProvider() { return compressionProvider; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 97be59a..9f129ca 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -24,6 +24,7 @@ import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.admin.ZooKeeperAdmin; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import java.util.Arrays; @@ -268,7 +269,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation client.processBackgroundOperation(data, event); } }; - client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); + ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); } catch ( Throwable e ) { @@ -287,7 +288,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation @Override public byte[] call() throws Exception { - return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat); + return ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat); } } ); diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index e14deff..961d5f0 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -201,8 +201,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat } return null; - } - + } + + protected CuratorFrameworkImpl getClient() + { + return client; + } + private void pathInBackground(final String path) { OperationAndData.ErrorCallback<String> errorCallback = null; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index daa5dd3..5bad7e7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -class Watching +public class Watching { private final Watcher watcher; private final CuratorWatcher curatorWatcher; @@ -31,7 +31,7 @@ class Watching private final CuratorFrameworkImpl client; private NamespaceWatcher namespaceWatcher; - Watching(CuratorFrameworkImpl client, boolean watched) + public Watching(CuratorFrameworkImpl client, boolean watched) { this.client = client; this.watcher = null; @@ -39,7 +39,7 @@ class Watching this.watched = watched; } - Watching(CuratorFrameworkImpl client, Watcher watcher) + public Watching(CuratorFrameworkImpl client, Watcher watcher) { this.client = client; this.watcher = watcher; @@ -47,7 +47,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) + public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { this.client = client; this.watcher = null; @@ -55,7 +55,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client) + public Watching(CuratorFrameworkImpl client) { this.client = client; watcher = null; diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java index fe49ad7..0431829 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java @@ -59,9 +59,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @SuppressWarnings("deprecation") +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestFramework extends BaseClassForTests { - @BeforeMethod + @BeforeMethod(alwaysRun = true) @Override public void setup() throws Exception { @@ -69,7 +70,7 @@ public class TestFramework extends BaseClassForTests super.setup(); } - @AfterMethod + @AfterMethod(alwaysRun = true) @Override public void teardown() throws Exception { diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index 7c6d156..7af039b 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -41,6 +41,7 @@ import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingServer; import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ZKPaths; @@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestFrameworkEdges extends BaseClassForTests { private final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index c6ff2bb..a6399de 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -57,7 +57,7 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -@Test(groups = Zk35MethodInterceptor.zk35Group) +@Test(groups = {Zk35MethodInterceptor.zk35Group, Zk35MethodInterceptor.curatorV2Group}) public class TestReconfiguration extends CuratorTestBase { private final Timing2 timing = new Timing2(); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java index 7e8ffbb..079c186 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java @@ -18,6 +18,7 @@ */ package org.apache.curator.framework.imps; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -33,6 +34,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import java.util.concurrent.CountDownLatch; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestWithCluster { @Test diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java index c929b41..bb0d569 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java @@ -24,12 +24,14 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestConnectionStateManager extends BaseClassForTests { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java index 175ccdf..246704f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java @@ -96,7 +96,7 @@ public class BaseTestTreeCache extends BaseClassForTests } @Override - @BeforeMethod + @BeforeMethod(alwaysRun = true) public void setup() throws Exception { super.setup(); @@ -111,7 +111,7 @@ public class BaseTestTreeCache extends BaseClassForTests } @Override - @AfterMethod + @AfterMethod(alwaysRun = true) public void teardown() throws Exception { try diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java index ff416d5..f9b8cce 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -40,6 +41,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestNodeCache extends BaseClassForTests { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index 78fabd5..7ec86b9 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@ -32,6 +32,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.zookeeper.CreateMode; @@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.testng.AssertJUnit.assertNotNull; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestPathChildrenCache extends BaseClassForTests { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java index cd87125..083d158 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache; import com.google.common.collect.Queues; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -34,6 +35,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestPathChildrenCacheInCluster extends BaseClassForTests { @Test(enabled = false) // this test is very flakey - it needs to be re-written at some point diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java index 1e97ce2..3e8a3a5 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.zookeeper.CreateMode; @@ -31,6 +32,7 @@ import org.testng.annotations.Test; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestTreeCache extends BaseTestTreeCache { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java index 4cb342c..07e9a17 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java @@ -19,11 +19,11 @@ package org.apache.curator.framework.recipes.leader; +import org.apache.curator.test.Compatibility; import org.apache.curator.test.TestingZooKeeperMain; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.ByteBufferInputStream; -import org.apache.zookeeper.server.NIOServerCnxn; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZooKeeperServer; @@ -92,7 +92,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory log.debug("Rejected : " + si.toString()); // Still reject request log.debug("Still not ready for " + remaining + "ms"); - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); return; } // Submit the request to the legacy Zookeeper server @@ -113,13 +113,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory firstError = System.currentTimeMillis(); // The znode has been created, close the connection and don't tell it to client log.warn("Closing connection right after " + createRequest.getPath() + " creation"); - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); } } catch ( Exception e ) { // Should not happen - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); } } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 3d9e9b7..ddb7f53 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -38,6 +38,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestLeaderLatch extends BaseClassForTests { private static final String PATH_NAME = "/one/two/me"; diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java index ed56f15..72ecf59 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java @@ -30,6 +30,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +@Test(groups = Zk35MethodInterceptor.curatorV2Group) public class TestInterProcessSemaphoreCluster extends BaseClassForTests { @Test diff --git a/curator-test/pom.xml b/curator-test/pom.xml index 3683b7d..4f0c5a2 100644 --- a/curator-test/pom.xml +++ b/curator-test/pom.xml @@ -41,6 +41,16 @@ </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index 51af821..79c8b9a 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -113,7 +113,7 @@ public class BaseClassForTests context.getSuite().addListener(methodListener2); } - @BeforeMethod + @BeforeMethod(alwaysRun = true) public void setup() throws Exception { if ( INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES != null ) @@ -142,7 +142,7 @@ public class BaseClassForTests } } - @AfterMethod + @AfterMethod(alwaysRun = true) public void teardown() throws Exception { System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY); diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java index 5b4b53f..1f8d181 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java @@ -18,10 +18,103 @@ */ package org.apache.curator.test; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerCnxnFactory; +import java.lang.reflect.Method; + +@SuppressWarnings("unchecked") public class Compatibility { + private static final Method closeAllWithReasonMethod; + private static final Method closeAllMethod; + private static final Method closeWithReasonMethod; + private static final Method closeMethod; + private static final Object disconnectReasonObj; + + static + { + Object localDisconnectReasonObj; + Method localCloseAllWithReasonMethod; + Method localCloseAllMethod; + Method localCloseWithReasonMethod; + Method localCloseMethod; + try + { + Class disconnectReasonClass = Class.forName("org.apache.zookeeper.server.ServerCnxn$DisconnectReason"); + localDisconnectReasonObj = Enum.valueOf(disconnectReasonClass, "UNKNOWN"); + localCloseAllWithReasonMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll", disconnectReasonClass); + localCloseWithReasonMethod = ServerCnxn.class.getDeclaredMethod("close", disconnectReasonClass); + localCloseAllMethod = null; + localCloseMethod = null; + + localCloseAllWithReasonMethod.setAccessible(true); + localCloseWithReasonMethod.setAccessible(true); + } + catch ( Throwable e ) + { + localDisconnectReasonObj = null; + localCloseAllWithReasonMethod = null; + localCloseWithReasonMethod = null; + try + { + localCloseAllMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll"); + localCloseMethod = ServerCnxn.class.getDeclaredMethod("close"); + + localCloseAllMethod.setAccessible(true); + localCloseMethod.setAccessible(true); + } + catch ( Throwable ex ) + { + throw new IllegalStateException("Could not reflectively find ServerCnxnFactory/ServerCnxn close methods"); + } + } + disconnectReasonObj = localDisconnectReasonObj; + closeAllWithReasonMethod = localCloseAllWithReasonMethod; + closeAllMethod = localCloseAllMethod; + closeMethod = localCloseMethod; + closeWithReasonMethod = localCloseWithReasonMethod; + } + public static boolean isZK34() { return false; } + + public static void serverCnxnFactoryCloseAll(ServerCnxnFactory factory) + { + try + { + if ( closeAllMethod != null ) + { + closeAllMethod.invoke(factory); + } + else + { + closeAllWithReasonMethod.invoke(factory, disconnectReasonObj); + } + } + catch ( Exception e ) + { + throw new RuntimeException("Could not close factory", e); + } + } + + public static void serverCnxnClose(ServerCnxn cnxn) + { + try + { + if ( closeMethod != null ) + { + closeMethod.invoke(cnxn); + } + else + { + closeWithReasonMethod.invoke(cnxn, disconnectReasonObj); + } + } + catch ( Exception e ) + { + throw new RuntimeException("Could not close connection", e); + } + } } diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java index 3d38fe1..58da2c0 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java @@ -225,7 +225,7 @@ public class TestingCluster implements Closeable */ public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception { - Method m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress"); + Method m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress"); m.setAccessible(true); InetSocketAddress address = (InetSocketAddress)m.invoke(client); if ( address != null ) diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java index 3b3ab26..7489527 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java @@ -39,7 +39,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace Field cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory"); cnxnFactoryField.setAccessible(true); ServerCnxnFactory cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer); - cnxnFactory.closeAll(); + Compatibility.serverCnxnFactoryCloseAll(cnxnFactory); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java index 841df77..91f185f 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java @@ -81,7 +81,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { if ( cnxnFactory != null ) { - cnxnFactory.closeAll(); + Compatibility.serverCnxnFactoryCloseAll(cnxnFactory); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); @@ -262,7 +262,9 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config) { - super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null); + this.setTxnLogFactory(txnLog); + this.setMinSessionTimeout(config.getMinSessionTimeout()); + this.setMaxSessionTimeout(config.getMaxSessionTimeout()); } private final AtomicBoolean isRunning = new AtomicBoolean(false); diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java index 8072b68..10fafde 100644 --- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java +++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java @@ -29,6 +29,7 @@ import java.util.List; public class Zk35MethodInterceptor implements IMethodInterceptor { public static final String zk35Group = "zk35"; + public static final String curatorV2Group = "curatorV2"; @Override public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context) diff --git a/curator-v2/pom.xml b/curator-v2/pom.xml new file mode 100644 index 0000000..0ff537a --- /dev/null +++ b/curator-v2/pom.xml @@ -0,0 +1,146 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.curator</groupId> + <artifactId>apache-curator</artifactId> + <version>4.2.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>curator-v2</artifactId> + + <properties> + <zookeeper-new-version>3.6.0-SNAPSHOT</zookeeper-new-version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-async</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zookeeper-new-version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <dependenciesToScan> + <dependency>org.apache.curator:curator-framework</dependency> + <dependency>org.apache.curator:curator-recipes</dependency> + </dependenciesToScan> + <groups>curatorV2</groups> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java similarity index 68% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java index 5b4b53f..ad6d434 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java @@ -16,12 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; +package org.apache.curator.framework.api; -public class Compatibility +import org.apache.zookeeper.AddWatchMode; + +public interface AddWatchBuilder extends AddWatchBuilder2 { - public static boolean isZK34() - { - return false; - } -} + /** + * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used + * + * @param mode mode to use + * @return this + */ + AddWatchBuilder2 withMode(AddWatchMode mode); +} \ No newline at end of file diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java similarity index 81% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java index 5b4b53f..9114c00 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java @@ -16,12 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; -public class Compatibility +package org.apache.curator.framework.api; + +public interface AddWatchBuilder2 extends + Backgroundable<AddWatchable<Pathable<Void>>>, + AddWatchable<Pathable<Void>>, + Pathable<Void> { - public static boolean isZK34() - { - return false; - } -} +} \ No newline at end of file diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchable.java similarity index 68% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchable.java index 42279d0..1f0646c 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-v2/src/main/java/org/apache/curator/framework/api/AddWatchable.java @@ -16,16 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; + +package org.apache.curator.framework.api; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -public class DefaultZookeeperFactory implements ZookeeperFactory +public interface AddWatchable<T> { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(Watcher watcher); + + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(CuratorWatcher watcher); +} \ No newline at end of file diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java similarity index 75% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-v2/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java index 5b4b53f..3cd5528 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-v2/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java @@ -16,12 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; -public class Compatibility +package org.apache.curator.framework.api; + +/** + * Builder to allow watches to be removed + */ +public interface WatchesBuilder extends RemoveWatchesBuilder { - public static boolean isZK34() - { - return false; - } -} + /** + * Start an add watch operation + * + * @return builder + */ + AddWatchBuilder add(); +} \ No newline at end of file diff --git a/curator-v2/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java new file mode 100644 index 0000000..ed77d9f --- /dev/null +++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.imps; + +import org.apache.curator.RetryLoop; +import org.apache.curator.drivers.OperationTrace; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.AddWatchBuilder; +import org.apache.curator.framework.api.AddWatchBuilder2; +import org.apache.curator.framework.api.AddWatchable; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; +import java.util.concurrent.Executor; + +public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String> +{ + private final CuratorFrameworkImpl client; + private Watching watching; + private Backgrounding backgrounding = new Backgrounding(); + private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE; + + AddWatchBuilderImpl(CuratorFrameworkImpl client) + { + this.client = client; + watching = new Watching(client, true); + } + + public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode) + { + this.client = client; + this.watching = watching; + this.backgrounding = backgrounding; + this.mode = mode; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground() + { + backgrounding = new Backgrounding(); + return this; + } + + @Override + public AddWatchBuilder2 withMode(AddWatchMode mode) + { + this.mode = mode; + return this; + } + + @Override + public Pathable<Void> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public Pathable<Void> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(Object context) + { + backgrounding = new Backgrounding(context); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback) + { + backgrounding = new Backgrounding(callback); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context) + { + backgrounding = new Backgrounding(callback, context); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor) + { + backgrounding = new Backgrounding(callback, executor); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; + } + + @Override + public Void forPath(String path) throws Exception + { + if ( backgrounding.inBackground() ) + { + client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); + } + else + { + pathInForeground(path); + } + return null; + } + + @Override + public void performBackgroundOperation(final OperationAndData<String> data) throws Exception + { + String path = data.getData(); + String fixedPath = client.fixForNamespace(path); + try + { + final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background"); + client.getZooKeeper().addWatch + ( + fixedPath, + watching.getWatcher(path), + mode, + (rc, path1, ctx) -> { + trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null); + client.processBackgroundOperation(data, event); + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e, watching); + } + } + + private void pathInForeground(final String path) throws Exception + { + final String fixedPath = client.fixForNamespace(path); + OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground"); + RetryLoop.callWithRetry + ( + client.getZookeeperClient(), () -> { + if ( watching.isWatched() ) + { + client.getZooKeeper().addWatch(fixedPath, mode); + } + else + { + client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode); + } + return null; + }); + trace.setPath(fixedPath).setWithWatcher(true).commit(); + } +} \ No newline at end of file diff --git a/curator-v2/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkV2Impl.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkV2Impl.java new file mode 100644 index 0000000..338ca6f --- /dev/null +++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkV2Impl.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.*; +import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.TransactionOp; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.schema.SchemaSet; +import org.apache.curator.framework.state.ConnectionStateErrorPolicy; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.v2.CuratorFrameworkV2; +import org.apache.curator.v2.WatcherRemoveCuratorFrameworkV2; +import org.apache.curator.utils.EnsurePath; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class CuratorFrameworkV2Impl implements CuratorFrameworkV2 +{ + private final CuratorFrameworkImpl client; + + public CuratorFrameworkV2Impl(CuratorFramework client) + { + this.client = reveal(client); + } + + private static CuratorFrameworkImpl reveal(CuratorFramework client) + { + try + { + return (CuratorFrameworkImpl)Objects.requireNonNull(client, "client cannot be null"); + } + catch ( Exception e ) + { + throw new IllegalArgumentException("Only Curator clients created through CuratorFrameworkFactory are supported: " + client.getClass().getName()); + } + } + + @Override + public void start() + { + client.start(); + } + + @Override + public void close() + { + client.close(); + } + + @Override + public CuratorFrameworkState getState() + { + return client.getState(); + } + + @Override + public boolean isStarted() + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public CreateBuilder create() + { + return client.create(); + } + + @Override + public DeleteBuilder delete() + { + return client.delete(); + } + + @Override + public ExistsBuilder checkExists() + { + return client.checkExists(); + } + + @Override + public GetDataBuilder getData() + { + return client.getData(); + } + + @Override + public SetDataBuilder setData() + { + return client.setData(); + } + + @Override + public GetChildrenBuilder getChildren() + { + return client.getChildren(); + } + + @Override + public GetACLBuilder getACL() + { + return client.getACL(); + } + + @Override + public SetACLBuilder setACL() + { + return client.setACL(); + } + + @Override + public ReconfigBuilder reconfig() + { + return client.reconfig(); + } + + @Override + public GetConfigBuilder getConfig() + { + return client.getConfig(); + } + + @SuppressWarnings("deprecation") + @Override + public CuratorTransaction inTransaction() + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public CuratorMultiTransaction transaction() + { + return client.transaction(); + } + + @Override + public TransactionOp transactionOp() + { + return client.transactionOp(); + } + + @Override + public void sync(String path, Object backgroundContextObject) + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public void createContainers(String path) throws Exception + { + client.createContainers(path); + } + + @Override + public SyncBuilder sync() + { + return client.sync(); + } + + @Override + public WatchesBuilder watches() + { + return new WatchesBuilderImpl(client); + } + + @Override + public Listenable<ConnectionStateListener> getConnectionStateListenable() + { + return client.getConnectionStateListenable(); + } + + @Override + public Listenable<CuratorListener> getCuratorListenable() + { + return client.getCuratorListenable(); + } + + @Override + public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() + { + return client.getUnhandledErrorListenable(); + } + + @Override + public CuratorFramework nonNamespaceView() + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public CuratorFrameworkV2 usingNamespace(String newNamespace) + { + return CuratorFrameworkV2.wrap(client.usingNamespace(newNamespace)); + } + + @Override + public String getNamespace() + { + return client.getNamespace(); + } + + @Override + public CuratorZookeeperClient getZookeeperClient() + { + return client.getZookeeperClient(); + } + + @SuppressWarnings("deprecation") + @Override + public EnsurePath newNamespaceAwareEnsurePath(String path) + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public void clearWatcherReferences(Watcher watcher) + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException + { + return client.blockUntilConnected(maxWaitTime, units); + } + + @Override + public void blockUntilConnected() throws InterruptedException + { + client.blockUntilConnected(); + } + + @Override + public WatcherRemoveCuratorFrameworkV2 newWatcherRemoveCuratorFramework() + { + return new WatcherRemoveCuratorFrameworkV2Impl(client, client.newWatcherRemoveCuratorFramework()); + } + + @Override + public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() + { + return client.getConnectionStateErrorPolicy(); + } + + @Override + public QuorumVerifier getCurrentConfig() + { + return client.getCurrentConfig(); + } + + @Override + public SchemaSet getSchemaSet() + { + return client.getSchemaSet(); + } + + @Override + public boolean isZk34CompatibilityMode() + { + return client.isZk34CompatibilityMode(); + } + + @Override + public CompletableFuture<Void> runSafe(Runnable runnable) + { + return client.runSafe(runnable); + } +} diff --git a/curator-v2/src/main/java/org/apache/curator/framework/imps/WatcherRemoveCuratorFrameworkV2Impl.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatcherRemoveCuratorFrameworkV2Impl.java new file mode 100644 index 0000000..67daf3e --- /dev/null +++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatcherRemoveCuratorFrameworkV2Impl.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; +import org.apache.curator.framework.api.*; +import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.TransactionOp; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.schema.SchemaSet; +import org.apache.curator.framework.state.ConnectionStateErrorPolicy; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.v2.CuratorFrameworkV2; +import org.apache.curator.v2.WatcherRemoveCuratorFrameworkV2; +import org.apache.curator.utils.EnsurePath; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +class WatcherRemoveCuratorFrameworkV2Impl implements WatcherRemoveCuratorFrameworkV2 +{ + private final CuratorFrameworkImpl client; + private final WatcherRemoveCuratorFramework facade; + + WatcherRemoveCuratorFrameworkV2Impl(CuratorFrameworkImpl client, WatcherRemoveCuratorFramework facade) + { + this.client = client; + this.facade = facade; + } + + @Override + public void removeWatchers() + { + facade.removeWatchers(); + } + + @Override + public void start() + { + facade.start(); + } + + @Override + public void close() + { + facade.close(); + } + + @Override + public CuratorFrameworkState getState() + { + return facade.getState(); + } + + @Override + public boolean isStarted() + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public CreateBuilder create() + { + return facade.create(); + } + + @Override + public DeleteBuilder delete() + { + return facade.delete(); + } + + @Override + public ExistsBuilder checkExists() + { + return facade.checkExists(); + } + + @Override + public GetDataBuilder getData() + { + return facade.getData(); + } + + @Override + public SetDataBuilder setData() + { + return facade.setData(); + } + + @Override + public GetChildrenBuilder getChildren() + { + return facade.getChildren(); + } + + @Override + public GetACLBuilder getACL() + { + return facade.getACL(); + } + + @Override + public SetACLBuilder setACL() + { + return facade.setACL(); + } + + @Override + public ReconfigBuilder reconfig() + { + return facade.reconfig(); + } + + @Override + public GetConfigBuilder getConfig() + { + return facade.getConfig(); + } + + @Override + public CuratorTransaction inTransaction() + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public CuratorMultiTransaction transaction() + { + return facade.transaction(); + } + + @Override + public TransactionOp transactionOp() + { + return facade.transactionOp(); + } + + @Override + public void sync(String path, Object backgroundContextObject) + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public void createContainers(String path) throws Exception + { + facade.createContainers(path); + } + + @Override + public SyncBuilder sync() + { + return facade.sync(); + } + + @Override + public WatchesBuilder watches() + { + return new WatchesBuilderImpl(client); + } + + @Override + public Listenable<ConnectionStateListener> getConnectionStateListenable() + { + return facade.getConnectionStateListenable(); + } + + @Override + public Listenable<CuratorListener> getCuratorListenable() + { + return facade.getCuratorListenable(); + } + + @Override + public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() + { + return facade.getUnhandledErrorListenable(); + } + + @Override + public CuratorFramework nonNamespaceView() + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public CuratorFrameworkV2 usingNamespace(String newNamespace) + { + return CuratorFrameworkV2.wrap(facade.usingNamespace(newNamespace)); + } + + @Override + public String getNamespace() + { + return facade.getNamespace(); + } + + @Override + public CuratorZookeeperClient getZookeeperClient() + { + return facade.getZookeeperClient(); + } + + @Override + public EnsurePath newNamespaceAwareEnsurePath(String path) + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public void clearWatcherReferences(Watcher watcher) + { + throw new UnsupportedOperationException("deprecated"); + } + + @Override + public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException + { + return facade.blockUntilConnected(maxWaitTime, units); + } + + @Override + public void blockUntilConnected() throws InterruptedException + { + facade.blockUntilConnected(); + } + + @Override + public WatcherRemoveCuratorFrameworkV2 newWatcherRemoveCuratorFramework() + { + return new WatcherRemoveCuratorFrameworkV2Impl(client, facade.newWatcherRemoveCuratorFramework()); + } + + @Override + public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() + { + return facade.getConnectionStateErrorPolicy(); + } + + @Override + public QuorumVerifier getCurrentConfig() + { + return facade.getCurrentConfig(); + } + + @Override + public SchemaSet getSchemaSet() + { + return facade.getSchemaSet(); + } + + @Override + public boolean isZk34CompatibilityMode() + { + return facade.isZk34CompatibilityMode(); + } + + @Override + public CompletableFuture<Void> runSafe(Runnable runnable) + { + return facade.runSafe(runnable); + } +} diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java similarity index 50% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-v2/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java index 42279d0..a840ffe 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-v2/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java @@ -16,16 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.AddWatchBuilder; +import org.apache.curator.framework.api.WatchesBuilder; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher.WatcherType; -public class DefaultZookeeperFactory implements ZookeeperFactory +public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder { + public WatchesBuilderImpl(CuratorFrameworkImpl client) + { + super(client); + } + + public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding) + { + super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding); + } + @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception + public AddWatchBuilder add() { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); + return new AddWatchBuilderImpl(getClient()); } -} +} \ No newline at end of file diff --git a/curator-v2/src/main/java/org/apache/curator/v2/AsyncCuratorFrameworkV2.java b/curator-v2/src/main/java/org/apache/curator/v2/AsyncCuratorFrameworkV2.java new file mode 100644 index 0000000..745322c --- /dev/null +++ b/curator-v2/src/main/java/org/apache/curator/v2/AsyncCuratorFrameworkV2.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.v2; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.WatchMode; +import org.apache.curator.x.async.api.AsyncCuratorFrameworkDslV2; +import org.apache.curator.x.async.details.AsyncCuratorFrameworkV2Impl; +import org.apache.zookeeper.WatchedEvent; +import java.util.function.UnaryOperator; + +public interface AsyncCuratorFrameworkV2 extends AsyncCuratorFramework, AsyncCuratorFrameworkDslV2 +{ + /** + * Wrap a CuratorFramework instance to gain access to newer ZooKeeper features + * + * @param client client instance + * @return wrapped client + */ + static AsyncCuratorFrameworkV2 wrap(CuratorFramework client) + { + return new AsyncCuratorFrameworkV2Impl(AsyncCuratorFramework.wrap(client)); + } + + /** + * Wrap a AsyncCuratorFramework instance to gain access to newer ZooKeeper features + * + * @param client client instance + * @return wrapped client + */ + static AsyncCuratorFrameworkV2 wrap(AsyncCuratorFramework client) + { + return new AsyncCuratorFrameworkV2Impl(client); + } + + @Override + AsyncCuratorFrameworkDslV2 with(WatchMode mode); + + @Override + AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener); + + @Override + AsyncCuratorFrameworkDslV2 with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter); + + @Override + AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter); + + @Override + AsyncCuratorFrameworkDslV2 with(WatchMode mode, UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter); +} diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/v2/CuratorFrameworkV2.java similarity index 51% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-v2/src/main/java/org/apache/curator/v2/CuratorFrameworkV2.java index 42279d0..a83bb46 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-v2/src/main/java/org/apache/curator/v2/CuratorFrameworkV2.java @@ -16,16 +16,35 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; +package org.apache.curator.v2; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.WatchesBuilder; +import org.apache.curator.framework.imps.CuratorFrameworkV2Impl; -public class DefaultZookeeperFactory implements ZookeeperFactory +public interface CuratorFrameworkV2 extends CuratorFramework { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception + /** + * Wrap a CuratorFramework instance to gain access to newer ZooKeeper features + * + * @param client client instance + * @return wrapped client + */ + static CuratorFrameworkV2 wrap(CuratorFramework client) { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); + return new CuratorFrameworkV2Impl(client); } + + /** + * Start a watches builder + * + * @return builder + */ + WatchesBuilder watches(); + + @Override + CuratorFrameworkV2 usingNamespace(String newNamespace); + + @Override + WatcherRemoveCuratorFrameworkV2 newWatcherRemoveCuratorFramework(); } diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/v2/WatcherRemoveCuratorFrameworkV2.java similarity index 79% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-v2/src/main/java/org/apache/curator/v2/WatcherRemoveCuratorFrameworkV2.java index 5b4b53f..ba0b61c 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-v2/src/main/java/org/apache/curator/v2/WatcherRemoveCuratorFrameworkV2.java @@ -16,12 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; +package org.apache.curator.v2; -public class Compatibility +import org.apache.curator.framework.WatcherRemoveCuratorFramework; + +public interface WatcherRemoveCuratorFrameworkV2 extends WatcherRemoveCuratorFramework, CuratorFrameworkV2 { - public static boolean isZK34() - { - return false; - } } diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDslV2.java similarity index 77% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDslV2.java index 5b4b53f..ae851a8 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDslV2.java @@ -16,12 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; +package org.apache.curator.x.async.api; -public class Compatibility +public interface AsyncCuratorFrameworkDslV2 extends AsyncCuratorFrameworkDsl { - public static boolean isZK34() - { - return false; - } + /** + * Start an add watch builder + * + * @return builder object + */ + AsyncWatchBuilder addWatch(); } diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java similarity index 60% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java index 42279d0..73172c3 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-v2/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java @@ -16,16 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; +package org.apache.curator.x.async.api; -public class DefaultZookeeperFactory implements ZookeeperFactory +import org.apache.curator.framework.api.AddWatchable; +import org.apache.curator.x.async.AsyncStage; +import org.apache.zookeeper.AddWatchMode; + +public interface AsyncWatchBuilder extends AddWatchable<AsyncPathable<AsyncStage<Void>>> { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} + /** + * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used + * + * @param mode mode + * @return this + */ + AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode); +} \ No newline at end of file diff --git a/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkV2Impl.java b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkV2Impl.java new file mode 100644 index 0000000..210a79f --- /dev/null +++ b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkV2Impl.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.v2.AsyncCuratorFrameworkV2; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.WatchMode; +import org.apache.curator.x.async.api.*; +import org.apache.zookeeper.WatchedEvent; +import java.util.Objects; +import java.util.function.UnaryOperator; + +public class AsyncCuratorFrameworkV2Impl implements AsyncCuratorFrameworkV2 +{ + private final AsyncCuratorFrameworkImpl client; + + public AsyncCuratorFrameworkV2Impl(AsyncCuratorFramework client) + { + this(reveal(client)); + } + + private AsyncCuratorFrameworkV2Impl(AsyncCuratorFrameworkImpl client) + { + this.client = client; + } + + private static AsyncCuratorFrameworkImpl reveal(Object client) + { + try + { + return (AsyncCuratorFrameworkImpl)Objects.requireNonNull(client, "client cannot be null"); + } + catch ( Exception e ) + { + throw new IllegalArgumentException("Only AsyncCuratorFramework clients wrapped via AsyncCuratorFramework.wrap(): " + client.getClass().getName()); + } + } + + @Override + public CuratorFramework unwrap() + { + return client.unwrap(); + } + + @Override + public AsyncCuratorFrameworkDslV2 with(WatchMode mode) + { + return new AsyncCuratorFrameworkV2Impl(reveal(client.with(mode))); + } + + @Override + public AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener) + { + return new AsyncCuratorFrameworkV2Impl(reveal(client.with(listener))); + } + + @Override + public AsyncCuratorFrameworkDslV2 with(UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) + { + return new AsyncCuratorFrameworkV2Impl(reveal(client.with(resultFilter, watcherFilter))); + } + + @Override + public AsyncCuratorFrameworkDslV2 with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) + { + return new AsyncCuratorFrameworkV2Impl(reveal(client.with(listener, resultFilter, watcherFilter))); + } + + @Override + public AsyncCuratorFrameworkDslV2 with(WatchMode mode, UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) + { + return new AsyncCuratorFrameworkV2Impl(reveal(client.with(mode, listener, resultFilter, watcherFilter))); + } + + @Override + public AsyncWatchBuilder addWatch() + { + return new AsyncWatchBuilderImpl(client.getClient(), client.getFilters()); + } + + @Override + public WatchableAsyncCuratorFramework watched() + { + return client.watched(); + } + + @Override + public AsyncCreateBuilder create() + { + return client.create(); + } + + @Override + public AsyncDeleteBuilder delete() + { + return client.delete(); + } + + @Override + public AsyncSetDataBuilder setData() + { + return client.setData(); + } + + @Override + public AsyncGetACLBuilder getACL() + { + return client.getACL(); + } + + @Override + public AsyncSetACLBuilder setACL() + { + return client.setACL(); + } + + @Override + public AsyncReconfigBuilder reconfig() + { + return client.reconfig(); + } + + @Override + public AsyncMultiTransaction transaction() + { + return client.transaction(); + } + + @Override + public AsyncTransactionOp transactionOp() + { + return client.transactionOp(); + } + + @Override + public AsyncSyncBuilder sync() + { + return client.sync(); + } + + @Override + public AsyncRemoveWatchesBuilder removeWatches() + { + return client.removeWatches(); + } + + @Override + public AsyncExistsBuilder checkExists() + { + return client.checkExists(); + } + + @Override + public AsyncGetDataBuilder getData() + { + return client.getData(); + } + + @Override + public AsyncGetChildrenBuilder getChildren() + { + return client.getChildren(); + } + + @Override + public AsyncGetConfigBuilder getConfig() + { + return client.getConfig(); + } +} diff --git a/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java new file mode 100644 index 0000000..f3c17ad --- /dev/null +++ b/curator-v2/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.AddWatchable; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.imps.AddWatchBuilderImpl; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.imps.Watching; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.api.AsyncPathable; +import org.apache.curator.x.async.api.AsyncWatchBuilder; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; + +import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; +import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + +class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>> +{ + private final CuratorFrameworkImpl client; + private final Filters filters; + private Watching watching = null; + private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE; + + AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters) + { + this.client = client; + this.filters = filters; + } + + @Override + public AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode) + { + this.mode = mode; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncStage<Void> forPath(String path) + { + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); + AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode); + return safeCall(common.internalCallback, () -> builder.forPath(path)); + } +} \ No newline at end of file diff --git a/curator-v2/src/test/java/org/apache/curator/v2/TestAsyncCuratorFrameworkV2.java b/curator-v2/src/test/java/org/apache/curator/v2/TestAsyncCuratorFrameworkV2.java new file mode 100644 index 0000000..1e38929 --- /dev/null +++ b/curator-v2/src/test/java/org/apache/curator/v2/TestAsyncCuratorFrameworkV2.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.v2; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.v2.AsyncCuratorFrameworkV2; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; + +public class TestAsyncCuratorFrameworkV2 extends CuratorTestBase +{ + @Test + public void testPersistentRecursiveWatch() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + AsyncCuratorFrameworkV2 async = AsyncCuratorFrameworkV2.wrap(client); + + client.start(); + client.blockUntilConnected(); + + CountDownLatch latch = new CountDownLatch(5); + Watcher watcher = event -> latch.countDown(); + async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test").toCompletableFuture().get(); + + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/a/b"); + client.create().forPath("/test/a/b/c"); + client.create().forPath("/test/a/b/c/d"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } +} diff --git a/curator-v2/src/test/java/org/apache/curator/v2/TestFrameworkV2.java b/curator-v2/src/test/java/org/apache/curator/v2/TestFrameworkV2.java new file mode 100644 index 0000000..f939e1c --- /dev/null +++ b/curator-v2/src/test/java/org/apache/curator/v2/TestFrameworkV2.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.v2; + +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.utils.ZookeeperFactory; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; + +import static org.apache.curator.v2.CuratorFrameworkV2.wrap; + +public class TestFrameworkV2 extends CuratorTestBase +{ + + @Test + public void testPersistentRecursiveWatch() throws Exception + { + try ( CuratorFrameworkV2 client = wrap(CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) ) + { + client.start(); + client.blockUntilConnected(); + + CountDownLatch latch = new CountDownLatch(5); + Watcher watcher = event -> latch.countDown(); + client.watches().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test"); + + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/a/b"); + client.create().forPath("/test/a/b/c"); + client.create().forPath("/test/a/b/c/d"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } + @Test + public void testPersistentRecursiveDefaultWatch() throws Exception + { + CountDownLatch latch = new CountDownLatch(5); + ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> { + Watcher actualWatcher = event -> { + watcher.process(event); + latch.countDown(); + }; + return new ZooKeeper(connectString, sessionTimeout, actualWatcher); + }; + try (CuratorFrameworkV2 client = wrap(CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build()) ) + { + client.start(); + client.blockUntilConnected(); + + client.watches().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test"); + + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/a/b"); + client.create().forPath("/test/a/b/c"); + client.create().forPath("/test/a/b/c/d"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } +} diff --git a/curator-v2/src/test/resources/log4j.properties b/curator-v2/src/test/resources/log4j.properties new file mode 100644 index 0000000..2a85e0d --- /dev/null +++ b/curator-v2/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=ERROR, console + +log4j.logger.org.apache.curator=DEBUG, console +log4j.additivity.org.apache.curator=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index 167cf50..f13d311 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -221,6 +221,16 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework return new AsyncGetConfigBuilderImpl(client, filters, getBuilderWatchMode()); } + Filters getFilters() + { + return filters; + } + + CuratorFrameworkImpl getClient() + { + return client; + } + private WatchMode getBuilderWatchMode() { return watched ? watchMode : null; diff --git a/pom.xml b/pom.xml index 3a5e152..731cc84 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ <jdk-version>1.${short-jdk-version}</jdk-version> <!-- versions --> - <zookeeper-version>3.5.5</zookeeper-version> + <zookeeper-version>3.5.6</zookeeper-version> <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version> <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version> <doxia-module-confluence-version>1.8</doxia-module-confluence-version> @@ -85,25 +85,27 @@ <guava-failureaccess-version>1.0.1</guava-failureaccess-version> <testng-version>6.14.3</testng-version> <swift-version>0.23.1</swift-version> - <dropwizard-version>1.3.7</dropwizard-version> <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version> <slf4j-version>1.7.25</slf4j-version> <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version> + <dropwizard-version>3.2.5</dropwizard-version> + <snappy-version>1.1.7</snappy-version> <!-- OSGi Properties --> - <osgi.export.package /> - <osgi.import.package /> - <osgi.private.package /> - <osgi.dynamic.import /> - <osgi.require.bundle /> - <osgi.export.service /> - <osgi.activator /> + <osgi.export.package/> + <osgi.import.package/> + <osgi.private.package/> + <osgi.dynamic.import/> + <osgi.require.bundle/> + <osgi.export.service/> + <osgi.activator/> </properties> <scm> <url>https://github.com/apache/curator.git</url> <connection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</connection> - <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</developerConnection> + <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git + </developerConnection> <tag>apache-curator-3.2.0</tag> </scm> @@ -318,6 +320,7 @@ <module>curator-x-discovery-server</module> <module>curator-x-async</module> <module>curator-test-zk34</module> + <module>curator-v2</module> </modules> <dependencyManagement> @@ -567,6 +570,24 @@ <artifactId>dropwizard-logging</artifactId> <version>${dropwizard-version}</version> </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${dropwizard-version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy-version}</version> + </dependency> </dependencies> </dependencyManagement> @@ -582,7 +603,6 @@ <artifactId>maven-javadoc-plugin</artifactId> <version>${maven-javadoc-plugin-version}</version> <configuration> - <aggregate>true</aggregate> <additionalJOptions> <additionalJOption>-J-Xmx1g</additionalJOption> </additionalJOptions> @@ -872,7 +892,8 @@ <relocations> <relocation> <pattern>com.google</pattern> - <shadedPattern>org.apache.curator.shaded.com.google</shadedPattern> + <shadedPattern>org.apache.curator.shaded.com.google + </shadedPattern> <excludes> <exclude>com.google.common.base.Function</exclude> <exclude>com.google.common.base.Predicate</exclude> diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility.confluence index ed6e32e..cd65784 100644 --- a/src/site/confluence/zk-compatibility.confluence +++ b/src/site/confluence/zk-compatibility.confluence @@ -1,15 +1,46 @@ h1. ZooKeeper Version Compatibility -While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is -used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator -4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0 -both versions of ZooKeeper are supported via the same Curator libraries. +There are multiple active version lines of ZooKeeper used in production by users. Curator can act +as a client for any of these versions. h2. ZooKeeper 3.5.x -* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x +* Curator 4.x has a hard dependency on ZooKeeper 3.5.x * If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0 +h2. ZooKeeper 3.6.x + +ZooKeeper 3.6.x is the newest version of ZooKeeper which adds new features such as +Persistent/Recursive watchers. To use Curator +X.X with ZooKeeper 3.6.x use the dependency +{{curator-v2}} instead of {{curator-recipes}}. Create a {{CuratorFramework}} instance in the +normal manner. To get access to new features, wrap the CuratorFramework in a CuratorFrameworkV2 +instances. E.g. + +{code} +CuratorFramework client = CuratorFrameworkFactory... +CuratorFrameworkV2 clientV2 = CuratorFrameworkV2.wrap(client); + +client.watches().add()... +{code} + +_Maven_ + +{code} +<dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-v2</artifactId> + <version>${curator-version}</version> +</dependency> +{code} + +_Gradle_ + +{code} +compile 'org.apache.curator:curator-v2:$curatorVersion' +{code} + + h2. ZooKeeper 3.4.x Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
