This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher in repository https://gitbox.apache.org/repos/asf/curator.git
commit bdddaf9ea96e5cd1732ea00f51ff092b1dcc2d60 Author: randgalt <[email protected]> AuthorDate: Wed Oct 2 20:09:32 2019 -0500 Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs. --- .../org/apache/curator/utils/Compatibility.java | 25 +++ .../curator/utils/DefaultZookeeperFactory.java | 3 +- .../apache/curator/framework/CuratorFramework.java | 2 +- .../curator/framework/api/AddWatchBuilder.java | 21 +-- .../curator/framework/api/AddWatchBuilder2.java | 15 +- .../apache/curator/framework/api/AddWatchable.java | 27 ++-- .../curator/framework/api/CuratorEventType.java | 7 +- .../curator/framework/api/WatchesBuilder.java | 63 ++++---- .../framework/imps/AddWatchBuilderImpl.java | 171 +++++++++++++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 4 +- .../imps/CuratorMultiTransactionRecord.java | 34 ++-- .../framework/imps/ReconfigBuilderImpl.java | 18 ++- .../framework/imps/RemoveWatchesBuilderImpl.java | 9 +- .../curator/framework/imps/WatchesBuilderImpl.java | 47 ++++++ .../apache/curator/framework/imps/Watching.java | 10 +- .../curator/framework/imps/TestFramework.java | 51 ++++++ .../recipes/leader/ChaosMonkeyCnxnFactory.java | 7 +- curator-test/pom.xml | 10 ++ .../apache/curator/test/TestingQuorumPeerMain.java | 3 +- .../apache/curator/test/TestingZooKeeperMain.java | 5 +- .../org/apache/curator/test/WatchersDebug.java | 9 ++ .../x/async/api/AsyncCuratorFrameworkDsl.java | 7 + .../curator/x/async/api/AsyncWatchBuilder.java | 52 +++++++ .../x/async/details/AsyncCuratorFrameworkImpl.java | 6 + .../x/async/details/AsyncWatchBuilderImpl.java | 94 +++++++++++ .../curator/framework/imps/TestFramework.java | 59 +++++++ pom.xml | 23 ++- 27 files changed, 686 insertions(+), 96 deletions(-) diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java index 1ee2301..618b302 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java +++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java @@ -31,6 +31,7 @@ import java.lang.reflect.Method; public class Compatibility { private static final boolean hasZooKeeperAdmin; + private static final boolean hasPersistentWatches; private static final Method queueEventMethod; private static final Logger logger = LoggerFactory.getLogger(Compatibility.class); @@ -61,6 +62,19 @@ public class Compatibility LoggerFactory.getLogger(Compatibility.class).info("Using emulated InjectSessionExpiration"); } queueEventMethod = localQueueEventMethod; + + boolean localHasPersistentWatches; + try + { + Class.forName("org.apache.zookeeper.AddWatchMode"); + localHasPersistentWatches = true; + } + catch ( ClassNotFoundException e ) + { + localHasPersistentWatches = false; + logger.info("Running without persistent watches"); + } + hasPersistentWatches = localHasPersistentWatches; } /** @@ -74,6 +88,17 @@ public class Compatibility } /** + * Return true of persistent watches are available in the + * ZooKeeper library being used. + * + * @return true/false + */ + public static boolean hasPersistentWatches() + { + return hasPersistentWatches; + } + + /** * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>. * For ZooKeeper 3.4.x do the equivalent via reflection * diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java index 42279d0..acd32e7 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java @@ -20,12 +20,13 @@ package org.apache.curator.utils; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.admin.ZooKeeperAdmin; public class DefaultZookeeperFactory implements ZookeeperFactory { @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); + return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly); } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 3737faa..a803e63 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -196,7 +196,7 @@ public interface CuratorFramework extends Closeable * Start a remove watches builder. * @return builder object */ - public RemoveWatchesBuilder watches(); + public WatchesBuilder watches(); /** * Returns the listenable interface for the Connect State diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java similarity index 65% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java index 42279d0..ad6d434 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java @@ -16,16 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; +package org.apache.curator.framework.api; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.AddWatchMode; -public class DefaultZookeeperFactory implements ZookeeperFactory +public interface AddWatchBuilder extends AddWatchBuilder2 { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} + /** + * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used + * + * @param mode mode to use + * @return this + */ + AddWatchBuilder2 withMode(AddWatchMode mode); +} \ No newline at end of file diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java similarity index 65% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java index 42279d0..b784fc5 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java @@ -16,16 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; +package org.apache.curator.framework.api; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; - -public class DefaultZookeeperFactory implements ZookeeperFactory +public interface AddWatchBuilder2 extends + Backgroundable<AddWatchable<Pathable<Void>>>, AddWatchable<Pathable<Void>> { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} +} \ No newline at end of file diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java similarity index 68% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java index 42279d0..dbd2666 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java @@ -16,16 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.curator.utils; +package org.apache.curator.framework.api; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -public class DefaultZookeeperFactory implements ZookeeperFactory +public interface AddWatchable<T> { - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(Watcher watcher); + + /** + * Set a watcher for the operation + * + * @param watcher the watcher + * @return this + */ + T usingWatcher(CuratorWatcher watcher); +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java index 5dea211..22cc181 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java @@ -96,5 +96,10 @@ public enum CuratorEventType /** * Event sent when client is being closed */ - CLOSING + CLOSING, + + /** + * Corresponds to {@link CuratorFramework#addWatch()} + */ + ADD_WATCH } diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java similarity index 65% copy from curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java copy to curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java index 42279d0..821f1a3 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java @@ -1,31 +1,32 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.utils; - -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; - -public class DefaultZookeeperFactory implements ZookeeperFactory -{ - @Override - public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception - { - return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +/** + * Builder to allow watches to be removed + */ +public interface WatchesBuilder extends RemoveWatchesBuilder +{ + /** + * Start an add watch operation + * + * @return builder + */ + AddWatchBuilder add(); +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java new file mode 100644 index 0000000..64e1de5 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.RetryLoop; +import org.apache.curator.drivers.OperationTrace; +import org.apache.curator.framework.api.AddWatchBuilder; +import org.apache.curator.framework.api.AddWatchBuilder2; +import org.apache.curator.framework.api.AddWatchable; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.Pathable; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.Watcher; +import java.util.concurrent.Executor; + +public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String> +{ + private final CuratorFrameworkImpl client; + private Watching watching = null; + private Backgrounding backgrounding = new Backgrounding(); + private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE; + + AddWatchBuilderImpl(CuratorFrameworkImpl client) + { + this.client = client; + } + + public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode) + { + this.client = client; + this.watching = watching; + this.backgrounding = backgrounding; + this.mode = mode; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground() + { + backgrounding = new Backgrounding(); + return this; + } + + @Override + public AddWatchBuilder2 withMode(AddWatchMode mode) + { + this.mode = mode; + return this; + } + + @Override + public Pathable<Void> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public Pathable<Void> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(Object context) + { + backgrounding = new Backgrounding(context); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback) + { + backgrounding = new Backgrounding(callback); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context) + { + backgrounding = new Backgrounding(callback, context); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor) + { + backgrounding = new Backgrounding(callback, executor); + return this; + } + + @Override + public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor) + { + backgrounding = new Backgrounding(client, callback, context, executor); + return this; + } + + @Override + public Void forPath(String path) throws Exception + { + if ( backgrounding.inBackground() ) + { + client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); + } + else + { + pathInForeground(path); + } + return null; + } + + @Override + public void performBackgroundOperation(final OperationAndData<String> data) throws Exception + { + String path = data.getData(); + String fixedPath = client.fixForNamespace(path); + try + { + final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background"); + client.getZooKeeper().addWatch + ( + fixedPath, + watching.getWatcher(path), + mode, + (rc, path1, ctx) -> { + trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit(); + CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null); + client.processBackgroundOperation(data, event); + }, + backgrounding.getContext() + ); + } + catch ( Throwable e ) + { + backgrounding.checkError(e, watching); + } + } + + private void pathInForeground(final String path) throws Exception + { + final String fixedPath = client.fixForNamespace(path); + OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground"); + RetryLoop.callWithRetry + ( + client.getZookeeperClient(), () -> { + client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode); + return null; + }); + trace.setPath(fixedPath).setWithWatcher(true).commit(); + } +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index e003bf0..c8ebbb6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -565,10 +565,10 @@ public class CuratorFrameworkImpl implements CuratorFramework } @Override - public RemoveWatchesBuilder watches() + public WatchesBuilder watches() { Preconditions.checkState(!isZk34CompatibilityMode(), "Remove watches APIs are not support when running in ZooKeeper 3.4 compatibility mode"); - return new RemoveWatchesBuilderImpl(this); + return new WatchesBuilderImpl(this); } protected void internalSync(CuratorFrameworkImpl impl, String path, Object context) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java index 3e72609..fbac6e6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java @@ -16,49 +16,57 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import com.google.common.collect.Lists; import org.apache.curator.framework.api.transaction.OperationType; import org.apache.curator.framework.api.transaction.TypeAndPath; -import org.apache.zookeeper.MultiTransactionRecord; import org.apache.zookeeper.Op; import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; -class CuratorMultiTransactionRecord extends MultiTransactionRecord +class CuratorMultiTransactionRecord implements Iterable<Op> { - private final List<TypeAndPath> metadata = Lists.newArrayList(); - - @Override - public final void add(Op op) - { - throw new UnsupportedOperationException(); - } + private final List<TypeAndPath> metadata = Lists.newArrayList(); + private List<Op> ops = new ArrayList<>(); void add(Op op, OperationType type, String forPath) { - super.add(op); + ops.add(op); metadata.add(new TypeAndPath(type, forPath)); } - TypeAndPath getMetadata(int index) + TypeAndPath getMetadata(int index) { return metadata.get(index); } - int metadataSize() + int metadataSize() { return metadata.size(); } void addToDigest(MessageDigest digest) { - for ( Op op : this ) + for ( Op op : ops ) { digest.update(op.getPath().getBytes()); digest.update(Integer.toString(op.getType()).getBytes()); digest.update(op.toRequestRecord().toString().getBytes()); } } + + @Override + public Iterator<Op> iterator() + { + return ops.iterator(); + } + + int size() + { + return ops.size(); + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 97be59a..f8b78da 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -24,6 +24,8 @@ import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.admin.ZooKeeperAdmin; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import java.util.Arrays; @@ -268,7 +270,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation client.processBackgroundOperation(data, event); } }; - client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); + admin().reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); } catch ( Throwable e ) { @@ -276,6 +278,18 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation } } + private ZooKeeperAdmin admin() throws Exception + { + try + { + return (ZooKeeperAdmin)client.getZooKeeper(); + } + catch ( ClassCastException e ) + { + throw new Exception("ZooKeeper instance is not an instance of ZooKeeperAdmin"); + } + } + private byte[] ensembleInForeground() throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground"); @@ -287,7 +301,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation @Override public byte[] call() throws Exception { - return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat); + return admin().reconfigure(joining, leaving, newMembers, fromConfig, responseStat); } } ); diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index e14deff..b268e71 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -201,8 +201,13 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat } return null; - } - + } + + CuratorFrameworkImpl getClient() + { + return client; + } + private void pathInBackground(final String path) { OperationAndData.ErrorCallback<String> errorCallback = null; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java new file mode 100644 index 0000000..f3c3857 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.api.AddWatchBuilder; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.WatchesBuilder; +import org.apache.curator.utils.Compatibility; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.WatcherType; + +public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder +{ + public WatchesBuilderImpl(CuratorFrameworkImpl client) + { + super(client); + } + + public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding) + { + super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding); + } + + @Override + public AddWatchBuilder add() + { + Preconditions.checkState(Compatibility.hasPersistentWatches(), "Persistent watches APIs are not support when running the ZooKeeper library being used"); + return new AddWatchBuilderImpl(getClient()); + } +} \ No newline at end of file diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index daa5dd3..5bad7e7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -class Watching +public class Watching { private final Watcher watcher; private final CuratorWatcher curatorWatcher; @@ -31,7 +31,7 @@ class Watching private final CuratorFrameworkImpl client; private NamespaceWatcher namespaceWatcher; - Watching(CuratorFrameworkImpl client, boolean watched) + public Watching(CuratorFrameworkImpl client, boolean watched) { this.client = client; this.watcher = null; @@ -39,7 +39,7 @@ class Watching this.watched = watched; } - Watching(CuratorFrameworkImpl client, Watcher watcher) + public Watching(CuratorFrameworkImpl client, Watcher watcher) { this.client = client; this.watcher = watcher; @@ -47,7 +47,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) + public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { this.client = client; this.watcher = null; @@ -55,7 +55,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client) + public Watching(CuratorFrameworkImpl client) { this.client = client; watcher = null; diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java index fe49ad7..c448043 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java @@ -37,6 +37,7 @@ import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.EnsurePath; import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZookeeperFactory; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -1265,4 +1266,54 @@ public class TestFramework extends BaseClassForTests CloseableUtils.closeQuietly(client); } } + + @Test(groups = Zk35MethodInterceptor.zk35Group) + public void testPersistentRecursiveWatch() throws Exception + { + Timing2 timing = new Timing2(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + Watcher watcher = events::add; + client.watches().add().usingWatcher(watcher).forPath("/top/main"); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a"); + client.setData().forPath("/top/main/a", "foo".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged); + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test(groups = Zk35MethodInterceptor.zk35Group) + public void testPersistentWatch() throws Exception + { + Timing2 timing = new Timing2(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + Watcher watcher = events::add; + client.watches().add().withMode(AddWatchMode.PERSISTENT).usingWatcher(watcher).forPath("/top/main"); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + client.setData().forPath("/top/main/a", "foo".getBytes()); + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java index 4cb342c..f3dda40 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java @@ -26,6 +26,7 @@ import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.NIOServerCnxn; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZooKeeperServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +93,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory log.debug("Rejected : " + si.toString()); // Still reject request log.debug("Still not ready for " + remaining + "ms"); - ((NIOServerCnxn)si.cnxn).close(); + ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN); return; } // Submit the request to the legacy Zookeeper server @@ -113,13 +114,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory firstError = System.currentTimeMillis(); // The znode has been created, close the connection and don't tell it to client log.warn("Closing connection right after " + createRequest.getPath() + " creation"); - ((NIOServerCnxn)si.cnxn).close(); + ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN); } } catch ( Exception e ) { // Should not happen - ((NIOServerCnxn)si.cnxn).close(); + ((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN); } } } diff --git a/curator-test/pom.xml b/curator-test/pom.xml index 3683b7d..4f0c5a2 100644 --- a/curator-test/pom.xml +++ b/curator-test/pom.xml @@ -41,6 +41,16 @@ </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java index 3b3ab26..4baaff9 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java @@ -18,6 +18,7 @@ */ package org.apache.curator.test; +import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeerMain; @@ -39,7 +40,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace Field cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory"); cnxnFactoryField.setAccessible(true); ServerCnxnFactory cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer); - cnxnFactory.closeAll(); + cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java index 841df77..8e5ffee 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java @@ -23,6 +23,7 @@ import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.jmx.ZKMBeanInfo; import org.apache.zookeeper.server.ContainerManager; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZKDatabase; @@ -81,7 +82,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { if ( cnxnFactory != null ) { - cnxnFactory.closeAll(); + cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); @@ -262,7 +263,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config) { - super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null); + super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), config.getClientPortListenBacklog(), new ZKDatabase(txnLog), ""); } private final AtomicBoolean isRunning = new AtomicBoolean(false); diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java index e4c3b7e..e884b8c 100644 --- a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java +++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java @@ -27,16 +27,19 @@ public class WatchersDebug private static final Method getDataWatches; private static final Method getExistWatches; private static final Method getChildWatches; + private static final Method getPersistentWatches; static { Method localGetDataWatches = null; Method localGetExistWatches = null; Method localGetChildWatches = null; + Method localGetPersistentWatches = null; try { localGetDataWatches = getMethod("getDataWatches"); localGetExistWatches = getMethod("getExistWatches"); localGetChildWatches = getMethod("getChildWatches"); + localGetPersistentWatches = getMethod("getPersistentWatches"); } catch ( NoSuchMethodException e ) { @@ -45,6 +48,7 @@ public class WatchersDebug getDataWatches = localGetDataWatches; getExistWatches = localGetExistWatches; getChildWatches = localGetChildWatches; + getPersistentWatches = localGetPersistentWatches; } public static List<String> getDataWatches(ZooKeeper zooKeeper) @@ -62,6 +66,11 @@ public class WatchersDebug return callMethod(zooKeeper, getChildWatches); } + public static List<String> getPersistentWatches(ZooKeeper zooKeeper) + { + return callMethod(zooKeeper, getPersistentWatches); + } + private WatchersDebug() { } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java index bc66bb6..d70c02c 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java @@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework AsyncReconfigBuilder reconfig(); /** + * Start an add watch builder + * + * @return builder object + */ + AsyncWatchBuilder addWatch(); + + /** * Start a transaction builder * * @return builder object diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java new file mode 100644 index 0000000..f782a4c --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.curator.x.async.api; + + import org.apache.curator.framework.api.AddWatchable; + import org.apache.curator.x.async.AsyncStage; + import org.apache.zookeeper.AddWatchMode; + +public interface AsyncWatchBuilder extends AddWatchable<AsyncPathable<AsyncStage<Void>>> + { + /** + * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used + * + * @param mode mode + * @return this + */ + AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode); + } \ No newline at end of file diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index 167cf50..41ddee2 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override + public AsyncWatchBuilder addWatch() + { + return new AsyncWatchBuilderImpl(client, filters); + } + + @Override public AsyncMultiTransaction transaction() { return operations -> { diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java new file mode 100644 index 0000000..3092f51 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.curator.x.async.details; + + import org.apache.curator.framework.api.AddWatchable; + import org.apache.curator.framework.api.CuratorWatcher; + import org.apache.curator.framework.imps.AddWatchBuilderImpl; + import org.apache.curator.framework.imps.CuratorFrameworkImpl; + import org.apache.curator.framework.imps.Watching; + import org.apache.curator.x.async.AsyncStage; + import org.apache.curator.x.async.api.AsyncPathable; + import org.apache.curator.x.async.api.AsyncWatchBuilder; + import org.apache.zookeeper.AddWatchMode; + import org.apache.zookeeper.Watcher; + + import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc; + import static org.apache.curator.x.async.details.BackgroundProcs.safeCall; + + class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>> + { + private final CuratorFrameworkImpl client; + private final Filters filters; + private Watching watching = null; + private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE; + + AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters) + { + this.client = client; + this.filters = filters; + } + + @Override + public AddWatchable<AsyncPathable<AsyncStage<Void>>> withMode(AddWatchMode mode) + { + this.mode = mode; + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher) + { + watching = new Watching(client, watcher); + return this; + } + + @Override + public AsyncStage<Void> forPath(String path) + { + BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc); + AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode); + return safeCall(common.internalCallback, () -> builder.forPath(path)); + } + } \ No newline at end of file diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java index 27a84d0..f2b51fe 100644 --- a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java @@ -30,6 +30,8 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.curator.test.compatibility.Zk35MethodInterceptor; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -37,8 +39,11 @@ import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.api.DeleteOption; import org.apache.curator.x.async.api.ExistsOption; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.testng.Assert; @@ -657,4 +662,58 @@ public class TestFramework extends BaseClassForTests CloseableUtils.closeQuietly(client); } } + + @Test(groups = Zk35MethodInterceptor.zk35Group) + public void testPersistentRecursiveWatch() throws Exception + { + Timing2 timing = new Timing2(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + Watcher watcher = events::add; + async.addWatch().usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get(); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a"); + client.setData().forPath("/top/main/a", "foo".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged); + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test(groups = Zk35MethodInterceptor.zk35Group) + public void testPersistentWatch() throws Exception + { + Timing2 timing = new Timing2(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>(); + Watcher watcher = events::add; + async.addWatch().withMode(AddWatchMode.PERSISTENT).usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get(); + + client.create().creatingParentsIfNeeded().forPath("/top/main/a"); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + client.setData().forPath("/top/main/a", "foo".getBytes()); + client.setData().forPath("/top/main", "bar".getBytes()); + Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } } diff --git a/pom.xml b/pom.xml index 3a5e152..27b34cd 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ <jdk-version>1.${short-jdk-version}</jdk-version> <!-- versions --> - <zookeeper-version>3.5.5</zookeeper-version> + <zookeeper-version>3.6.0-SNAPSHOT</zookeeper-version> <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version> <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version> <doxia-module-confluence-version>1.8</doxia-module-confluence-version> @@ -85,10 +85,11 @@ <guava-failureaccess-version>1.0.1</guava-failureaccess-version> <testng-version>6.14.3</testng-version> <swift-version>0.23.1</swift-version> - <dropwizard-version>1.3.7</dropwizard-version> <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version> <slf4j-version>1.7.25</slf4j-version> <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version> + <dropwizard-version>3.2.5</dropwizard-version> + <snappy-version>1.1.7</snappy-version> <!-- OSGi Properties --> <osgi.export.package /> @@ -567,6 +568,24 @@ <artifactId>dropwizard-logging</artifactId> <version>${dropwizard-version}</version> </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${dropwizard-version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy-version}</version> + </dependency> </dependencies> </dependencyManagement>
