This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch zk36 in repository https://gitbox.apache.org/repos/asf/curator.git
commit c96a96450d8a3cb1c6c82be70bd19094a2c9e6f5 Author: randgalt <[email protected]> AuthorDate: Sat Nov 2 11:40:44 2019 -0500 Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x --- .../org/apache/curator/utils/Compatibility.java | 35 ++++ .../curator/utils/DefaultZookeeperFactory.java | 17 ++ .../apache/curator/framework/CuratorFramework.java | 9 + .../curator/framework/api/AddWatchBuilder.java | 18 +- .../curator/framework/api/AddWatchBuilder2.java | 14 +- .../apache/curator/framework/api/AddWatchable.java | 28 ++- .../curator/framework/api/CuratorEventType.java | 7 +- .../curator/framework/api/WatchesBuilder.java | 20 ++- .../framework/imps/AddWatchBuilderImpl.java | 197 +++++++++++++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 17 ++ .../imps/CuratorMultiTransactionRecord.java | 34 ++-- .../framework/imps/ReconfigBuilderImpl.java | 5 +- .../framework/imps/RemoveWatchesBuilderImpl.java | 9 +- .../curator/framework/imps/WatchesBuilderImpl.java | 26 ++- .../apache/curator/framework/imps/Watching.java | 10 +- .../framework/imps/TestCreateReturningStat.java | 3 +- .../curator/framework/imps/TestFramework.java | 9 +- .../curator/framework/imps/TestFrameworkEdges.java | 4 +- .../framework/imps/TestReconfiguration.java | 3 +- .../curator/framework/imps/TestTtlNodes.java | 3 +- .../framework/imps/TestWatcherRemovalManager.java | 3 +- ...tRemoveWatches.java => TestWatchesBuilder.java} | 64 ++++++- .../curator/framework/imps/TestWithCluster.java | 2 + .../state/TestConnectionStateManager.java | 2 + .../framework/recipes/cache/BaseTestTreeCache.java | 4 +- .../framework/recipes/cache/TestNodeCache.java | 2 + .../recipes/cache/TestPathChildrenCache.java | 2 + .../cache/TestPathChildrenCacheInCluster.java | 2 + .../framework/recipes/cache/TestTreeCache.java | 2 + .../recipes/leader/ChaosMonkeyCnxnFactory.java | 8 +- .../framework/recipes/leader/TestLeaderLatch.java | 2 + .../locks/TestInterProcessSemaphoreCluster.java | 2 + .../recipes/nodes/TestPersistentTtlNode.java | 3 +- curator-test-zk34/pom.xml | 2 +- {curator-test-zk34 => curator-test-zk35}/pom.xml | 112 +++++------- .../curator/framework/TestCompatibility.java | 49 +++++ .../src/test/resources/log4j.properties | 27 +++ curator-test/pom.xml | 10 ++ .../org/apache/curator/test/BaseClassForTests.java | 4 +- .../org/apache/curator/test/Compatibility.java | 93 ++++++++++ .../org/apache/curator/test/TestingCluster.java | 2 +- .../apache/curator/test/TestingQuorumPeerMain.java | 2 +- .../apache/curator/test/TestingZooKeeperMain.java | 6 +- .../test/compatibility/CuratorTestBase.java | 5 +- .../test/compatibility/Zk35MethodInterceptor.java | 56 ------ .../x/async/api/AsyncCuratorFrameworkDsl.java | 7 + .../curator/x/async/api/AsyncWatchBuilder.java | 26 +-- .../curator/x/async/api/AsyncWatchBuilder2.java | 16 +- .../x/async/details/AsyncCuratorFrameworkImpl.java | 23 ++- .../x/async/details/AsyncWatchBuilderImpl.java | 79 +++++++++ .../curator/framework/imps/TestAddWatch.java | 69 ++++++++ pom.xml | 45 +++-- src/site/confluence/zk-compatibility.confluence | 41 ++++- 53 files changed, 983 insertions(+), 257 deletions(-) diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java index 1ee2301..fd98d88 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java +++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java @@ -31,7 +31,9 @@ import java.lang.reflect.Method; public class Compatibility { private static final boolean hasZooKeeperAdmin; + private static final boolean hasPersistentWatchers; private static final Method queueEventMethod; + private static final Logger logger = LoggerFactory.getLogger(Compatibility.class); static @@ -49,6 +51,19 @@ public class Compatibility } hasZooKeeperAdmin = localHasZooKeeperAdmin; + boolean localHasPersistentWatchers; + try + { + Class.forName("org.apache.zookeeper.AddWatchMode"); + localHasPersistentWatchers = true; + } + catch ( ClassNotFoundException e ) + { + localHasPersistentWatchers = false; + logger.info("Persistent Watchers are not available in the version of the ZooKeeper library being used"); + } + hasPersistentWatchers = localHasPersistentWatchers; + Method localQueueEventMethod; try { @@ -74,6 +89,26 @@ public class Compatibility } /** + * Return true if the ZooKeeperAdmin class is available + * + * @return true/false + */ + public static boolean hasZooKeeperAdmin() + { + return hasZooKeeperAdmin; + } + + /** + * Return true if persistent watchers are available + * + * @return true/false + */ + public static boolean hasPersistentWatchers() + { + return hasPersistentWatchers; + } + + /** * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>. * For ZooKeeper 3.4.x do the equivalent via reflection * diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java index 42279d0..f936518 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java @@ -20,12 +20,29 @@ package org.apache.curator.utils; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.admin.ZooKeeperAdmin; public class DefaultZookeeperFactory implements ZookeeperFactory { + // hide org.apache.zookeeper.admin.ZooKeeperAdmin in a nested class so that Curator continues to work with ZK 3.4.x + private static class ZooKeeperAdminMaker implements ZookeeperFactory + { + static final ZooKeeperAdminMaker instance = new ZooKeeperAdminMaker(); + + @Override + public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception + { + return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly); + } + } + @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception { + if ( Compatibility.hasZooKeeperAdmin() ) + { + return ZooKeeperAdminMaker.instance.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); + } return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 3737faa..be5e1cb 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -194,11 +194,20 @@ public interface CuratorFramework extends Closeable /** * Start a remove watches builder. + * * @return builder object + * @deprecated use {@link #watchers()} in ZooKeeper 3.6+ */ public RemoveWatchesBuilder watches(); /** + * Start a watches builder. + * + * @return builder object + */ + public WatchesBuilder watchers(); + + /** * Returns the listenable interface for the Connect State * * @return listenable diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java similarity index 68% copy from curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java index a3c2a29..ad6d434 100644 --- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java @@ -16,13 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test.compatibility; +package org.apache.curator.framework.api; -import org.apache.curator.test.BaseClassForTests; -import org.testng.annotations.Listeners; +import org.apache.zookeeper.AddWatchMode; -@Listeners(Zk35MethodInterceptor.class) -public class CuratorTestBase extends BaseClassForTests +public interface AddWatchBuilder extends AddWatchBuilder2 { - protected final Timing2 timing = new Timing2(); -} + /** + * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used + * + * @param mode mode to use + * @return this + */ + AddWatchBuilder2 withMode(AddWatchMode mode); +} \ No newline at end of file diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java similarity index 81% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java index 5b4b53f..9114c00 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java @@ -16,12 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; -public class Compatibility +package org.apache.curator.framework.api; + +public interface AddWatchBuilder2 extends + Backgroundable<AddWatchable<Pathable<Void>>>, + AddWatchable<Pathable<Void>>, + Pathable<Void> { - public static boolean isZK34() - { - return false; - } -} +} \ No newline at end of file diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java similarity index 68% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java index 42279d0..1f0646c 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java @@ -16,16 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; + +package org.apache.curator.framework.api; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -public class DefaultZookeeperFactory implements ZookeeperFactory +public interface AddWatchable<T> { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(Watcher watcher); + + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(CuratorWatcher watcher); +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java index 5dea211..89e1490 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java @@ -96,5 +96,10 @@ public enum CuratorEventType /** * Event sent when client is being closed */ - CLOSING + CLOSING, + + /** + * Corresponds to {@link org.apache.curator.framework.CuratorFramework#watchers()} + */ + ADD_WATCH } diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java similarity index 75% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java index 5b4b53f..3cd5528 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java @@ -16,12 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; -public class Compatibility +package org.apache.curator.framework.api; + +/** + * Builder to allow watches to be removed + */ +public interface WatchesBuilder extends RemoveWatchesBuilder { - public static boolean isZK34() - { - return false; - } -} + /** + * Start an add watch operation + * + * @return builder + */ + AddWatchBuilder add(); +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java new file mode 100644 index 0000000..9f119a3 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.imps; + +import org.apache.curator.RetryLoop; +import org.apache.curator.drivers.OperationTrace; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.AddWatchBuilder; +import org.apache.curator.framework.api.AddWatchBuilder2; +import org.apache.curator.framework.api.AddWatchable; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; +import java.util.concurrent.Executor; + +public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String> +{ + private final CuratorFrameworkImpl client; + private Watching watching; + private Backgrounding backgrounding = new Backgrounding(); + private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE; + + AddWatchBuilderImpl(CuratorFrameworkImpl client) + { + this.client = client; + watching = new Watching(client, true); + } + + public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode) + { + this.client = client; + this.watching = watching; + this.backgrounding = backgrounding; + this.mode = mode; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground() + { + backgrounding = new Backgrounding(); + return this; + } + + @Override + public AddWatchBuilder2 withMode(AddWatchMode mode) + { + this.mode = mode; + return this; + } + + @Override + public Pathable<Void> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public Pathable<Void> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(Object context) + { + backgrounding = new Backgrounding(context); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback) + { + backgrounding = new Backgrounding(callback); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context) + { + backgrounding = new Backgrounding(callback, context); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor) + { + backgrounding = new Backgrounding(callback, executor); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; + } + + @Override + public Void forPath(String path) throws Exception + { + if ( backgrounding.inBackground() ) + { + client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); + } + else + { + pathInForeground(path); + } + return null; + } + + @Override + public void performBackgroundOperation(final OperationAndData<String> data) throws Exception + { + String path = data.getData(); + String fixedPath = client.fixForNamespace(path); + try + { + final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background"); + if ( watching.isWatched() ) + { + client.getZooKeeper().addWatch + ( + fixedPath, + mode, + (rc, path1, ctx) -> { + trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null); + client.processBackgroundOperation(data, event); + }, + backgrounding.getContext() + ); + } + else + { + client.getZooKeeper().addWatch + ( + fixedPath, + watching.getWatcher(path), + mode, + (rc, path1, ctx) -> { + trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null); + client.processBackgroundOperation(data, event); + }, + backgrounding.getContext() + ); + } + } + catch ( Throwable e ) + { + backgrounding.checkError(e, watching); + } + } + + private void pathInForeground(final String path) throws Exception + { + final String fixedPath = client.fixForNamespace(path); + OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground"); + RetryLoop.callWithRetry + ( + client.getZookeeperClient(), () -> { + if ( watching.isWatched() ) + { + client.getZooKeeper().addWatch(fixedPath, mode); + } + else + { + client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode); + } + return null; + }); + trace.setPath(fixedPath).setWithWatcher(true).commit(); + } +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index e003bf0..5deb8f9 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -42,6 +42,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateManager; +import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; import org.apache.curator.utils.ThreadUtils; @@ -571,6 +572,13 @@ public class CuratorFrameworkImpl implements CuratorFramework return new RemoveWatchesBuilderImpl(this); } + @Override + public WatchesBuilder watchers() + { + Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used. Use watches() instead."); + return new WatchesBuilderImpl(this); + } + protected void internalSync(CuratorFrameworkImpl impl, String path, Object context) { BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context); @@ -620,6 +628,15 @@ public class CuratorFrameworkImpl implements CuratorFramework return client.getZooKeeper(); } + Object getZooKeeperAdmin() throws Exception + { + if ( isZk34CompatibilityMode() ) + { + Preconditions.checkState(!isZk34CompatibilityMode(), "getZooKeeperAdmin() is not supported when running in ZooKeeper 3.4 compatibility mode"); + } + return client.getZooKeeper(); + } + CompressionProvider getCompressionProvider() { return compressionProvider; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java index 3e72609..fbac6e6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java @@ -16,49 +16,57 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import com.google.common.collect.Lists; import org.apache.curator.framework.api.transaction.OperationType; import org.apache.curator.framework.api.transaction.TypeAndPath; -import org.apache.zookeeper.MultiTransactionRecord; import org.apache.zookeeper.Op; import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; -class CuratorMultiTransactionRecord extends MultiTransactionRecord +class CuratorMultiTransactionRecord implements Iterable<Op> { - private final List<TypeAndPath> metadata = Lists.newArrayList(); - - @Override - public final void add(Op op) - { - throw new UnsupportedOperationException(); - } + private final List<TypeAndPath> metadata = Lists.newArrayList(); + private List<Op> ops = new ArrayList<>(); void add(Op op, OperationType type, String forPath) { - super.add(op); + ops.add(op); metadata.add(new TypeAndPath(type, forPath)); } - TypeAndPath getMetadata(int index) + TypeAndPath getMetadata(int index) { return metadata.get(index); } - int metadataSize() + int metadataSize() { return metadata.size(); } void addToDigest(MessageDigest digest) { - for ( Op op : this ) + for ( Op op : ops ) { digest.update(op.getPath().getBytes()); digest.update(Integer.toString(op.getType()).getBytes()); digest.update(op.toRequestRecord().toString().getBytes()); } } + + @Override + public Iterator<Op> iterator() + { + return ops.iterator(); + } + + int size() + { + return ops.size(); + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 97be59a..9f129ca 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -24,6 +24,7 @@ import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.admin.ZooKeeperAdmin; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import java.util.Arrays; @@ -268,7 +269,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation client.processBackgroundOperation(data, event); } }; - client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); + ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); } catch ( Throwable e ) { @@ -287,7 +288,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation @Override public byte[] call() throws Exception { - return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat); + return ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat); } } ); diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index e14deff..961d5f0 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -201,8 +201,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat } return null; - } - + } + + protected CuratorFrameworkImpl getClient() + { + return client; + } + private void pathInBackground(final String path) { OperationAndData.ErrorCallback<String> errorCallback = null; diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java similarity index 50% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java index 42279d0..4a273c6 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java @@ -16,16 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.api.AddWatchBuilder; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.WatchesBuilder; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher.WatcherType; -public class DefaultZookeeperFactory implements ZookeeperFactory +public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder { + public WatchesBuilderImpl(CuratorFrameworkImpl client) + { + super(client); + } + + public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding) + { + super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding); + } + @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception + public AddWatchBuilder add() { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); + return new AddWatchBuilderImpl(getClient()); } -} +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index daa5dd3..5bad7e7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -class Watching +public class Watching { private final Watcher watcher; private final CuratorWatcher curatorWatcher; @@ -31,7 +31,7 @@ class Watching private final CuratorFrameworkImpl client; private NamespaceWatcher namespaceWatcher; - Watching(CuratorFrameworkImpl client, boolean watched) + public Watching(CuratorFrameworkImpl client, boolean watched) { this.client = client; this.watcher = null; @@ -39,7 +39,7 @@ class Watching this.watched = watched; } - Watching(CuratorFrameworkImpl client, Watcher watcher) + public Watching(CuratorFrameworkImpl client, Watcher watcher) { this.client = client; this.watcher = watcher; @@ -47,7 +47,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) + public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { this.client = client; this.watcher = null; @@ -55,7 +55,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client) + public Watching(CuratorFrameworkImpl client) { this.client = client; watcher = null; diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java index 034791d..5a52cf0 100755 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCreateReturningStat.java @@ -26,7 +26,6 @@ import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.Timing; -import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.data.Stat; import org.testng.Assert; @@ -34,7 +33,7 @@ import org.testng.annotations.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -@Test(groups = Zk35MethodInterceptor.zk35Group) +@Test(groups = CuratorTestBase.zk35Group) public class TestCreateReturningStat extends CuratorTestBase { private CuratorFramework createClient() diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java index fe49ad7..97c8a33 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java @@ -31,8 +31,8 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; -import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.EnsurePath; import org.apache.curator.utils.ZKPaths; @@ -59,9 +59,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @SuppressWarnings("deprecation") +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestFramework extends BaseClassForTests { - @BeforeMethod + @BeforeMethod(alwaysRun = true) @Override public void setup() throws Exception { @@ -69,7 +70,7 @@ public class TestFramework extends BaseClassForTests super.setup(); } - @AfterMethod + @AfterMethod(alwaysRun = true) @Override public void teardown() throws Exception { @@ -77,7 +78,7 @@ public class TestFramework extends BaseClassForTests super.teardown(); } - @Test(groups = Zk35MethodInterceptor.zk35Group) + @Test(groups = CuratorTestBase.zk35Group) public void testWaitForShutdownTimeoutMs() throws Exception { final BlockingQueue<Integer> timeoutQueue = new ArrayBlockingQueue<>(1); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index 7c6d156..f89fff9 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -40,13 +40,13 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingServer; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -63,9 +63,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestFrameworkEdges extends BaseClassForTests { private final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index c6ff2bb..74b52fa 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -32,7 +32,6 @@ import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingZooKeeperServer; import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; -import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -57,7 +56,7 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -@Test(groups = Zk35MethodInterceptor.zk35Group) +@Test(groups = {CuratorTestBase.zk35Group, CuratorTestBase.zk35CompatibilityGroup}) public class TestReconfiguration extends CuratorTestBase { private final Timing2 timing = new Timing2(); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java index e2156df..6c5026a 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestTtlNodes.java @@ -25,7 +25,6 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.Timing; -import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.zookeeper.CreateMode; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -34,7 +33,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.concurrent.CountDownLatch; -@Test(groups = Zk35MethodInterceptor.zk35Group) +@Test(groups = CuratorTestBase.zk35Group) public class TestTtlNodes extends CuratorTestBase { @BeforeClass diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java index 74aac1d..8968e3e 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java @@ -27,7 +27,6 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.Timing; import org.apache.curator.test.WatchersDebug; -import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -37,7 +36,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -@Test(groups = Zk35MethodInterceptor.zk35Group) +@Test(groups = CuratorTestBase.zk35Group) public class TestWatcherRemovalManager extends CuratorTestBase { @Test diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java similarity index 88% rename from curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java rename to curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java index 63d8931..26c41f1 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java @@ -30,23 +30,25 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryOneTime; -import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.Timing; -import org.apache.curator.test.compatibility.Zk35MethodInterceptor; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.ZookeeperFactory; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.WatcherType; +import org.apache.zookeeper.ZooKeeper; import org.testng.Assert; import org.testng.annotations.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -@Test(groups = Zk35MethodInterceptor.zk35Group) -public class TestRemoveWatches extends CuratorTestBase +@Test(groups = CuratorTestBase.zk35Group) +public class TestWatchesBuilder extends CuratorTestBase { private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client) { @@ -610,8 +612,58 @@ public class TestRemoveWatches extends CuratorTestBase { CloseableUtils.closeQuietly(client); } - } - + } + + @Test(groups = CuratorTestBase.zk36Group) + public void testPersistentRecursiveWatch() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + client.blockUntilConnected(); + + CountDownLatch latch = new CountDownLatch(5); + Watcher watcher = event -> latch.countDown(); + client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test"); + + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/a/b"); + client.create().forPath("/test/a/b/c"); + client.create().forPath("/test/a/b/c/d"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } + + @Test(groups = CuratorTestBase.zk36Group) + public void testPersistentRecursiveDefaultWatch() throws Exception + { + CountDownLatch latch = new CountDownLatch(6); // 5 creates plus the initial sync + ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> { + Watcher actualWatcher = event -> { + watcher.process(event); + latch.countDown(); + }; + return new ZooKeeper(connectString, sessionTimeout, actualWatcher); + }; + try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() ) + { + client.start(); + client.blockUntilConnected(); + + client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test"); + + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/a/b"); + client.create().forPath("/test/a/b/c"); + client.create().forPath("/test/a/b/c/d"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } + private static class CountDownWatcher implements Watcher { private String path; private EventType eventType; diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java index 7e8ffbb..fbd6e2a 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWithCluster.java @@ -18,6 +18,7 @@ */ package org.apache.curator.framework.imps; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -33,6 +34,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import java.util.concurrent.CountDownLatch; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestWithCluster { @Test diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java index c929b41..63ccefe 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java @@ -23,6 +23,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; @@ -30,6 +31,7 @@ import org.testng.annotations.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestConnectionStateManager extends BaseClassForTests { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java index 175ccdf..246704f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java @@ -96,7 +96,7 @@ public class BaseTestTreeCache extends BaseClassForTests } @Override - @BeforeMethod + @BeforeMethod(alwaysRun = true) public void setup() throws Exception { super.setup(); @@ -111,7 +111,7 @@ public class BaseTestTreeCache extends BaseClassForTests } @Override - @AfterMethod + @AfterMethod(alwaysRun = true) public void teardown() throws Exception { try diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java index ff416d5..872eee2 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -40,6 +41,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestNodeCache extends BaseClassForTests { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index 78fabd5..ac02090 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@ -32,6 +32,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.zookeeper.CreateMode; @@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.testng.AssertJUnit.assertNotNull; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestPathChildrenCache extends BaseClassForTests { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java index cd87125..3895625 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache; import com.google.common.collect.Queues; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -34,6 +35,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestPathChildrenCacheInCluster extends BaseClassForTests { @Test(enabled = false) // this test is very flakey - it needs to be re-written at some point diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java index 1e97ce2..4c7052d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.Compatibility; import org.apache.zookeeper.CreateMode; @@ -31,6 +32,7 @@ import org.testng.annotations.Test; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestTreeCache extends BaseTestTreeCache { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java index 4cb342c..07e9a17 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java @@ -19,11 +19,11 @@ package org.apache.curator.framework.recipes.leader; +import org.apache.curator.test.Compatibility; import org.apache.curator.test.TestingZooKeeperMain; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.ByteBufferInputStream; -import org.apache.zookeeper.server.NIOServerCnxn; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZooKeeperServer; @@ -92,7 +92,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory log.debug("Rejected : " + si.toString()); // Still reject request log.debug("Still not ready for " + remaining + "ms"); - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); return; } // Submit the request to the legacy Zookeeper server @@ -113,13 +113,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory firstError = System.currentTimeMillis(); // The znode has been created, close the connection and don't tell it to client log.warn("Closing connection right after " + createRequest.getPath() + " creation"); - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); } } catch ( Exception e ) { // Should not happen - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); } } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 3d9e9b7..8f4e23f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -37,6 +37,7 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; @@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestLeaderLatch extends BaseClassForTests { private static final String PATH_NAME = "/one/two/me"; diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java index ed56f15..add547e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java @@ -30,6 +30,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +@Test(groups = CuratorTestBase.zk35CompatibilityGroup) public class TestInterProcessSemaphoreCluster extends BaseClassForTests { @Test diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java index 360c876..049e1cd 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java @@ -26,7 +26,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.Timing; import org.apache.curator.test.compatibility.CuratorTestBase; -import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.ZKPaths; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -38,7 +37,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode.BUILD_INITIAL_CACHE; -@Test(groups = Zk35MethodInterceptor.zk35Group) +@Test(groups = CuratorTestBase.zk35Group) public class TestPersistentTtlNode extends CuratorTestBase { private final Timing timing = new Timing(); diff --git a/curator-test-zk34/pom.xml b/curator-test-zk34/pom.xml index 3d06cb0..606aba2 100644 --- a/curator-test-zk34/pom.xml +++ b/curator-test-zk34/pom.xml @@ -160,7 +160,7 @@ <dependency>org.apache.curator:curator-framework</dependency> <dependency>org.apache.curator:curator-recipes</dependency> </dependenciesToScan> - <excludedGroups>zk35</excludedGroups> + <excludedGroups>zk35,zk36</excludedGroups> </configuration> </plugin> diff --git a/curator-test-zk34/pom.xml b/curator-test-zk35/pom.xml similarity index 72% copy from curator-test-zk34/pom.xml copy to curator-test-zk35/pom.xml index 3d06cb0..5803893 100644 --- a/curator-test-zk34/pom.xml +++ b/curator-test-zk35/pom.xml @@ -1,27 +1,56 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>apache-curator</artifactId> <groupId>org.apache.curator</groupId> + <artifactId>apache-curator</artifactId> <version>4.2.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>curator-test-zk34</artifactId> - - <name>Curator ZooKeeper 3.4 Testing</name> - <description>Tests for ZoKeeper 3.4 compatibility</description> - <inceptionYear>2017</inceptionYear> + <artifactId>curator-test-zk35</artifactId> <properties> - <zookeeper-34-version>3.4.8</zookeeper-34-version> + <zookeeper-35-version>3.5.6</zookeeper-35-version> </properties> <dependencies> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-async</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <version>${zookeeper-34-version}</version> + <version>${zookeeper-35-version}</version> <exclusions> <exclusion> <groupId>com.sun.jmx</groupId> @@ -49,7 +78,6 @@ <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> - <version>2.12.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> @@ -61,31 +89,6 @@ <dependency> <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <exclusions> <exclusion> @@ -93,12 +96,13 @@ <artifactId>zookeeper</artifactId> </exclusion> </exclusions> + <type>test-jar</type> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> + <artifactId>curator-framework</artifactId> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> @@ -120,18 +124,6 @@ <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-math</artifactId> - <scope>test</scope> - </dependency> </dependencies> <build> @@ -160,32 +152,10 @@ <dependency>org.apache.curator:curator-framework</dependency> <dependency>org.apache.curator:curator-recipes</dependency> </dependenciesToScan> - <excludedGroups>zk35</excludedGroups> + <groups>zk35,zk35Compatibility</groups> + <excludedGroups>zk36</excludedGroups> </configuration> </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-curator-test-classes</id> - <phase>validate</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/generated-test-sources/test-annotations/org/apache/curator/test/compatibility</outputDirectory> - <resources> - <resource> - <directory>../curator-test/src/main/java/org/apache/curator/test/compatibility</directory> - <filtering>false</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> </project> diff --git a/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java new file mode 100644 index 0000000..5112d41 --- /dev/null +++ b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework; + +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.testng.annotations.Test; + +public class TestCompatibility extends CuratorTestBase +{ + @Test(expectedExceptions = IllegalStateException.class) + public void testPersistentWatchesNotAvailable() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + client.watchers().add().forPath("/foo"); + } + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testPersistentWatchesNotAvailableAsync() + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.addWatch().forPath("/foo"); + } + } +} diff --git a/curator-test-zk35/src/test/resources/log4j.properties b/curator-test-zk35/src/test/resources/log4j.properties new file mode 100644 index 0000000..2a85e0d --- /dev/null +++ b/curator-test-zk35/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=ERROR, console + +log4j.logger.org.apache.curator=DEBUG, console +log4j.additivity.org.apache.curator=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n diff --git a/curator-test/pom.xml b/curator-test/pom.xml index 3683b7d..4f0c5a2 100644 --- a/curator-test/pom.xml +++ b/curator-test/pom.xml @@ -41,6 +41,16 @@ </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index 51af821..79c8b9a 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -113,7 +113,7 @@ public class BaseClassForTests context.getSuite().addListener(methodListener2); } - @BeforeMethod + @BeforeMethod(alwaysRun = true) public void setup() throws Exception { if ( INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES != null ) @@ -142,7 +142,7 @@ public class BaseClassForTests } } - @AfterMethod + @AfterMethod(alwaysRun = true) public void teardown() throws Exception { System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY); diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java index 5b4b53f..1f8d181 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java @@ -18,10 +18,103 @@ */ package org.apache.curator.test; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerCnxnFactory; +import java.lang.reflect.Method; + +@SuppressWarnings("unchecked") public class Compatibility { + private static final Method closeAllWithReasonMethod; + private static final Method closeAllMethod; + private static final Method closeWithReasonMethod; + private static final Method closeMethod; + private static final Object disconnectReasonObj; + + static + { + Object localDisconnectReasonObj; + Method localCloseAllWithReasonMethod; + Method localCloseAllMethod; + Method localCloseWithReasonMethod; + Method localCloseMethod; + try + { + Class disconnectReasonClass = Class.forName("org.apache.zookeeper.server.ServerCnxn$DisconnectReason"); + localDisconnectReasonObj = Enum.valueOf(disconnectReasonClass, "UNKNOWN"); + localCloseAllWithReasonMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll", disconnectReasonClass); + localCloseWithReasonMethod = ServerCnxn.class.getDeclaredMethod("close", disconnectReasonClass); + localCloseAllMethod = null; + localCloseMethod = null; + + localCloseAllWithReasonMethod.setAccessible(true); + localCloseWithReasonMethod.setAccessible(true); + } + catch ( Throwable e ) + { + localDisconnectReasonObj = null; + localCloseAllWithReasonMethod = null; + localCloseWithReasonMethod = null; + try + { + localCloseAllMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll"); + localCloseMethod = ServerCnxn.class.getDeclaredMethod("close"); + + localCloseAllMethod.setAccessible(true); + localCloseMethod.setAccessible(true); + } + catch ( Throwable ex ) + { + throw new IllegalStateException("Could not reflectively find ServerCnxnFactory/ServerCnxn close methods"); + } + } + disconnectReasonObj = localDisconnectReasonObj; + closeAllWithReasonMethod = localCloseAllWithReasonMethod; + closeAllMethod = localCloseAllMethod; + closeMethod = localCloseMethod; + closeWithReasonMethod = localCloseWithReasonMethod; + } + public static boolean isZK34() { return false; } + + public static void serverCnxnFactoryCloseAll(ServerCnxnFactory factory) + { + try + { + if ( closeAllMethod != null ) + { + closeAllMethod.invoke(factory); + } + else + { + closeAllWithReasonMethod.invoke(factory, disconnectReasonObj); + } + } + catch ( Exception e ) + { + throw new RuntimeException("Could not close factory", e); + } + } + + public static void serverCnxnClose(ServerCnxn cnxn) + { + try + { + if ( closeMethod != null ) + { + closeMethod.invoke(cnxn); + } + else + { + closeWithReasonMethod.invoke(cnxn, disconnectReasonObj); + } + } + catch ( Exception e ) + { + throw new RuntimeException("Could not close connection", e); + } + } } diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java index 3d38fe1..58da2c0 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java @@ -225,7 +225,7 @@ public class TestingCluster implements Closeable */ public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception { - Method m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress"); + Method m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress"); m.setAccessible(true); InetSocketAddress address = (InetSocketAddress)m.invoke(client); if ( address != null ) diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java index 3b3ab26..7489527 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java @@ -39,7 +39,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace Field cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory"); cnxnFactoryField.setAccessible(true); ServerCnxnFactory cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer); - cnxnFactory.closeAll(); + Compatibility.serverCnxnFactoryCloseAll(cnxnFactory); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java index 841df77..91f185f 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java @@ -81,7 +81,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { if ( cnxnFactory != null ) { - cnxnFactory.closeAll(); + Compatibility.serverCnxnFactoryCloseAll(cnxnFactory); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); @@ -262,7 +262,9 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config) { - super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null); + this.setTxnLogFactory(txnLog); + this.setMinSessionTimeout(config.getMinSessionTimeout()); + this.setMaxSessionTimeout(config.getMaxSessionTimeout()); } private final AtomicBoolean isRunning = new AtomicBoolean(false); diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java index a3c2a29..c5d4018 100644 --- a/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java +++ b/curator-test/src/main/java/org/apache/curator/test/compatibility/CuratorTestBase.java @@ -21,8 +21,11 @@ package org.apache.curator.test.compatibility; import org.apache.curator.test.BaseClassForTests; import org.testng.annotations.Listeners; -@Listeners(Zk35MethodInterceptor.class) public class CuratorTestBase extends BaseClassForTests { + public static final String zk35Group = "zk35"; + public static final String zk36Group = "zk36"; + public static final String zk35CompatibilityGroup = "zk35Compatibility"; + protected final Timing2 timing = new Timing2(); } diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java deleted file mode 100644 index 8072b68..0000000 --- a/curator-test/src/main/java/org/apache/curator/test/compatibility/Zk35MethodInterceptor.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.test.compatibility; - -import org.apache.curator.test.Compatibility; -import org.testng.IMethodInstance; -import org.testng.IMethodInterceptor; -import org.testng.ITestContext; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -public class Zk35MethodInterceptor implements IMethodInterceptor -{ - public static final String zk35Group = "zk35"; - - @Override - public List<IMethodInstance> intercept(List<IMethodInstance> methods, ITestContext context) - { - if ( !Compatibility.isZK34() ) - { - return methods; - } - - List<IMethodInstance> filteredMethods = new ArrayList<>(); - for ( IMethodInstance method : methods ) - { - if ( !isInGroup(method.getMethod().getGroups()) ) - { - filteredMethods.add(method); - } - } - return filteredMethods; - } - - private boolean isInGroup(String[] groups) - { - return (groups != null) && Arrays.asList(groups).contains(zk35Group); - } -} diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java index bc66bb6..e54b148 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java @@ -112,4 +112,11 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework * @return builder object */ AsyncRemoveWatchesBuilder removeWatches(); + + /** + * Start an add watch builder + * + * @return builder object + */ + AsyncWatchBuilder addWatch(); } diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java similarity index 60% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java index 42279d0..a5e86ec 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java @@ -16,16 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; +package org.apache.curator.x.async.api; -public class DefaultZookeeperFactory implements ZookeeperFactory +import org.apache.curator.framework.api.AddWatchable; +import org.apache.curator.x.async.AsyncStage; +import org.apache.zookeeper.AddWatchMode; + +public interface AsyncWatchBuilder extends + AddWatchable<AsyncPathable<AsyncStage<Void>>>, + AsyncPathable<AsyncStage<Void>> { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} + /** + * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used + * + * @param mode mode + * @return this + */ + AsyncWatchBuilder2 withMode(AddWatchMode mode); +} \ No newline at end of file diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java similarity index 74% copy from curator-test/src/main/java/org/apache/curator/test/Compatibility.java copy to curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java index 5b4b53f..a97cd4b 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java @@ -16,12 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.test; -public class Compatibility +package org.apache.curator.x.async.api; + +import org.apache.curator.framework.api.AddWatchable; +import org.apache.curator.x.async.AsyncStage; + +public interface AsyncWatchBuilder2 extends + AddWatchable<AsyncPathable<AsyncStage<Void>>>, + AsyncPathable<AsyncStage<Void>> { - public static boolean isZK34() - { - return false; - } -} +} \ No newline at end of file diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index 167cf50..a248c0e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -18,6 +18,7 @@ */ package org.apache.curator.x.async.details; +import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; @@ -26,7 +27,10 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.CuratorMultiTransactionImpl; import org.apache.curator.framework.imps.GetACLBuilderImpl; import org.apache.curator.framework.imps.SyncBuilderImpl; -import org.apache.curator.x.async.*; +import org.apache.curator.utils.Compatibility; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.*; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; @@ -150,6 +154,13 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override + public AsyncWatchBuilder addWatch() + { + Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used."); + return new AsyncWatchBuilderImpl(client, filters); + } + + @Override public CuratorFramework unwrap() { return client; @@ -221,6 +232,16 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework return new AsyncGetConfigBuilderImpl(client, filters, getBuilderWatchMode()); } + Filters getFilters() + { + return filters; + } + + CuratorFrameworkImpl getClient() + { + return client; + } + private WatchMode getBuilderWatchMode() { return watched ? watchMode : null; diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java new file mode 100644 index 0000000..a10540e --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.async.details; + +import org.apache.curator.framework.api.AddWatchable; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.imps.AddWatchBuilderImpl; +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.imps.Watching; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.api.AsyncPathable; +import org.apache.curator.x.async.api.AsyncWatchBuilder; +import org.apache.curator.x.async.api.AsyncWatchBuilder2; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; + +import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; +import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + +class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AsyncWatchBuilder2, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>> +{ + private final CuratorFrameworkImpl client; + private final Filters filters; + private Watching watching; + private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE; + + AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters) + { + this.client = client; + this.filters = filters; + watching = new Watching(client, true); + } + + @Override + public AsyncWatchBuilder2 withMode(AddWatchMode mode) + { + this.mode = mode; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncStage<Void> forPath(String path) + { + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); + AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode); + return safeCall(common.internalCallback, () -> builder.forPath(path)); + } +} \ No newline at end of file diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java new file mode 100644 index 0000000..3be4900 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java @@ -0,0 +1,69 @@ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.utils.ZookeeperFactory; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; + +public class TestAddWatch extends CuratorTestBase +{ + @Test + public void testPersistentRecursiveWatch() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + client.blockUntilConnected(); + + CountDownLatch latch = new CountDownLatch(5); + Watcher watcher = event -> latch.countDown(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test").toCompletableFuture().get(); + + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/a/b"); + client.create().forPath("/test/a/b/c"); + client.create().forPath("/test/a/b/c/d"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } + + @Test + public void testPersistentRecursiveDefaultWatch() throws Exception + { + CountDownLatch latch = new CountDownLatch(6); // 5 creates plus the initial sync + ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> { + Watcher actualWatcher = event -> { + watcher.process(event); + latch.countDown(); + }; + return new ZooKeeper(connectString, sessionTimeout, actualWatcher); + }; + try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() ) + { + client.start(); + client.blockUntilConnected(); + + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test"); + + client.create().forPath("/test"); + client.create().forPath("/test/a"); + client.create().forPath("/test/a/b"); + client.create().forPath("/test/a/b/c"); + client.create().forPath("/test/a/b/c/d"); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + } +} diff --git a/pom.xml b/pom.xml index 3a5e152..e223de6 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ <jdk-version>1.${short-jdk-version}</jdk-version> <!-- versions --> - <zookeeper-version>3.5.5</zookeeper-version> + <zookeeper-version>3.6.0-SNAPSHOT</zookeeper-version> <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version> <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version> <doxia-module-confluence-version>1.8</doxia-module-confluence-version> @@ -85,25 +85,27 @@ <guava-failureaccess-version>1.0.1</guava-failureaccess-version> <testng-version>6.14.3</testng-version> <swift-version>0.23.1</swift-version> - <dropwizard-version>1.3.7</dropwizard-version> <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version> <slf4j-version>1.7.25</slf4j-version> <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version> + <dropwizard-version>3.2.5</dropwizard-version> + <snappy-version>1.1.7</snappy-version> <!-- OSGi Properties --> - <osgi.export.package /> - <osgi.import.package /> - <osgi.private.package /> - <osgi.dynamic.import /> - <osgi.require.bundle /> - <osgi.export.service /> - <osgi.activator /> + <osgi.export.package/> + <osgi.import.package/> + <osgi.private.package/> + <osgi.dynamic.import/> + <osgi.require.bundle/> + <osgi.export.service/> + <osgi.activator/> </properties> <scm> <url>https://github.com/apache/curator.git</url> <connection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</connection> - <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git</developerConnection> + <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/curator.git + </developerConnection> <tag>apache-curator-3.2.0</tag> </scm> @@ -318,6 +320,7 @@ <module>curator-x-discovery-server</module> <module>curator-x-async</module> <module>curator-test-zk34</module> + <module>curator-test-zk35</module> </modules> <dependencyManagement> @@ -567,6 +570,24 @@ <artifactId>dropwizard-logging</artifactId> <version>${dropwizard-version}</version> </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${dropwizard-version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy-version}</version> + </dependency> </dependencies> </dependencyManagement> @@ -582,7 +603,6 @@ <artifactId>maven-javadoc-plugin</artifactId> <version>${maven-javadoc-plugin-version}</version> <configuration> - <aggregate>true</aggregate> <additionalJOptions> <additionalJOption>-J-Xmx1g</additionalJOption> </additionalJOptions> @@ -872,7 +892,8 @@ <relocations> <relocation> <pattern>com.google</pattern> - <shadedPattern>org.apache.curator.shaded.com.google</shadedPattern> + <shadedPattern>org.apache.curator.shaded.com.google + </shadedPattern> <excludes> <exclude>com.google.common.base.Function</exclude> <exclude>com.google.common.base.Predicate</exclude> diff --git a/src/site/confluence/zk-compatibility.confluence b/src/site/confluence/zk-compatibility.confluence index ed6e32e..cd65784 100644 --- a/src/site/confluence/zk-compatibility.confluence +++ b/src/site/confluence/zk-compatibility.confluence @@ -1,15 +1,46 @@ h1. ZooKeeper Version Compatibility -While ZooKeeper 3.5.x is still considered "beta" by the ZooKeeper development team, the reality is that it is -used in production by many users. However, ZooKeeper 3.4.x is also used in production. Prior to Apache Curator -4.0, both versions of ZooKeeper were supported via two versions of Apache Curator. Starting with Curator 4.0 -both versions of ZooKeeper are supported via the same Curator libraries. +There are multiple active version lines of ZooKeeper used in production by users. Curator can act +as a client for any of these versions. h2. ZooKeeper 3.5.x -* Curator 4.0 has a hard dependency on ZooKeeper 3.5.x +* Curator 4.x has a hard dependency on ZooKeeper 3.5.x * If you are using ZooKeeper 3.5.x there's nothing additional to do \- just use Curator 4.0 +h2. ZooKeeper 3.6.x + +ZooKeeper 3.6.x is the newest version of ZooKeeper which adds new features such as +Persistent/Recursive watchers. To use Curator +X.X with ZooKeeper 3.6.x use the dependency +{{curator-v2}} instead of {{curator-recipes}}. Create a {{CuratorFramework}} instance in the +normal manner. To get access to new features, wrap the CuratorFramework in a CuratorFrameworkV2 +instances. E.g. + +{code} +CuratorFramework client = CuratorFrameworkFactory... +CuratorFrameworkV2 clientV2 = CuratorFrameworkV2.wrap(client); + +client.watches().add()... +{code} + +_Maven_ + +{code} +<dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-v2</artifactId> + <version>${curator-version}</version> +</dependency> +{code} + +_Gradle_ + +{code} +compile 'org.apache.curator:curator-v2:$curatorVersion' +{code} + + h2. ZooKeeper 3.4.x Curator 4.0 supports ZooKeeper 3.4.x ensembles in a soft\-compatibility mode. To use this mode
