1. Updates from latest ZK changes. 2. Added async version for creating persistent watch
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d7bf1a24 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d7bf1a24 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d7bf1a24 Branch: refs/heads/persistent-watch Commit: d7bf1a2461ecfa0bf708b3890dc4b3a019cacdb0 Parents: a27f876 Author: randgalt <[email protected]> Authored: Wed Oct 4 16:01:51 2017 +0200 Committer: randgalt <[email protected]> Committed: Wed Oct 4 16:01:51 2017 +0200 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 5 +++++ .../imps/AddPersistentWatchBuilderImpl.java | 10 +++++++++- .../apache/curator/framework/imps/Watching.java | 10 +++++----- .../framework/recipes/cache/TreeCacheBridge.java | 18 ++++++++++++++++++ .../recipes/cache/TreeCacheBridgeImpl.java | 18 ++++++++++++++++++ .../framework/recipes/watch/CacheSelectors.java | 6 ++---- .../framework/recipes/watch/CachedNode.java | 18 ++++++++++++++++++ .../x/async/api/AsyncCuratorFrameworkDsl.java | 7 +++++++ .../async/details/AsyncCuratorFrameworkImpl.java | 6 ++++++ .../curator/x/async/TestBasicOperations.java | 14 ++++++++++++++ 10 files changed, 102 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index ce31d08..f075daa 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -193,6 +193,11 @@ public interface CuratorFramework extends Closeable */ public SyncBuilder sync(); + /** + * Start a persistent watch builder + * + * @return builder object + */ public AddPersistentWatchBuilder addPersistentWatch(); /** http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java index 56f8f79..4f51f39 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java @@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String> +public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String> { private final CuratorFrameworkImpl client; private Watching watching = null; @@ -45,6 +45,14 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab this.client = client; } + public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive) + { + this.client = client; + this.watching = watching; + this.backgrounding = backgrounding; + this.recursive = recursive; + } + @Override public AddPersistentWatchable<Pathable<Void>> inBackground() { http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index daa5dd3..5bad7e7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -class Watching +public class Watching { private final Watcher watcher; private final CuratorWatcher curatorWatcher; @@ -31,7 +31,7 @@ class Watching private final CuratorFrameworkImpl client; private NamespaceWatcher namespaceWatcher; - Watching(CuratorFrameworkImpl client, boolean watched) + public Watching(CuratorFrameworkImpl client, boolean watched) { this.client = client; this.watcher = null; @@ -39,7 +39,7 @@ class Watching this.watched = watched; } - Watching(CuratorFrameworkImpl client, Watcher watcher) + public Watching(CuratorFrameworkImpl client, Watcher watcher) { this.client = client; this.watcher = watcher; @@ -47,7 +47,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) + public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { this.client = client; this.watcher = null; @@ -55,7 +55,7 @@ class Watching this.watched = false; } - Watching(CuratorFrameworkImpl client) + public Watching(CuratorFrameworkImpl client) { this.client = client; watcher = null; http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java index 8b6f37a..4a0eed9 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.curator.framework.recipes.cache; import org.apache.curator.framework.listen.Listenable; http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java index 0198aa4..35fcac4 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.curator.framework.recipes.cache; import com.google.common.util.concurrent.MoreExecutors; http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java index 8814e57..ed6c6fa 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java @@ -21,9 +21,8 @@ package org.apache.curator.framework.recipes.watch; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.server.PathIterator; +import org.apache.zookeeper.server.PathParentIterator; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -315,10 +314,9 @@ public class CacheSelectors private CacheSelector getSelector(String fullPath) { - String parent = ZKPaths.getPathAndNode(fullPath).getPath(); for ( CompositeEntry entry : entries ) { - PathIterator pathIterator = new PathIterator(fullPath); + PathParentIterator pathIterator = PathParentIterator.forAll(fullPath); while ( pathIterator.hasNext() ) { if ( pathIterator.next().equals(entry.path) ) http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java index b07993f..18131cf 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.curator.framework.recipes.watch; import org.apache.zookeeper.data.Stat; http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java index bc66bb6..c1748d0 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java @@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework AsyncReconfigBuilder reconfig(); /** + * Start a persistent watch builder + * + * @return builder object + */ + AsyncPersistentWatchBuilder addPersistentWatch(); + + /** * Start a transaction builder * * @return builder object http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index 167cf50..afa1de0 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework } @Override + public AsyncPersistentWatchBuilderImpl addPersistentWatch() + { + return new AsyncPersistentWatchBuilderImpl(client, filters); + } + + @Override public AsyncMultiTransaction transaction() { return operations -> { http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java index f814146..aed1385 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java @@ -32,8 +32,10 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import static java.util.EnumSet.of; import static org.apache.curator.x.async.api.CreateOption.compress; @@ -199,4 +201,16 @@ public class TestBasicOperations extends CompletableBaseClassForTests complete(client.getData().storingStatIn(stat).forPath("/test")); Assert.assertEquals(stat.getDataLength(), "hey".length()); } + + @Test + public void testPersistentRecursiveWatch() throws Exception + { + BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<>(); + Watcher watcher = event -> events.add(event.getType()); + complete(client.addPersistentWatch().recursive().usingWatcher(watcher).forPath("/a/b")); + client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/c"); + client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/d"); + Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated); + Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated); + } }
