This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-functional
in repository https://gitbox.apache.org/repos/asf/curator.git

commit eaa5da246a2d5584b121dd18aac6d6f8e70f1034
Author: randgalt <[email protected]>
AuthorDate: Mon Oct 14 15:01:01 2019 +0300

    Added some functional utilities that will aid using persistent/recursive 
watchers
---
 .../framework/recipes/watch/WatcherHelpers.java    |  79 +++++++++++++
 .../curator/framework/recipes/watch/Watchers.java  |  93 +++++++++++++++
 .../framework/recipes/watch/WatchersImpl.java      | 110 ++++++++++++++++++
 .../framework/recipes/watch/TestWatchers.java      | 125 +++++++++++++++++++++
 .../x/async/modeled/details/ModeledCacheImpl.java  |  47 ++++++--
 src/site/confluence/utilities.confluence           |  17 +++
 6 files changed, 459 insertions(+), 12 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
new file mode 100644
index 0000000..4db368a
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.IntPredicate;
+import java.util.function.Predicate;
+
+public class WatcherHelpers
+{
+    private static final ZKPaths.PathAndNode NULL_PATH_AND_NODE = new 
ZKPaths.PathAndNode("/", "");
+
+    public static ZKPaths.PathAndNode mapToPathAndNode(WatchedEvent event)
+    {
+        if ( event.getType() == Watcher.Event.EventType.None )
+        {
+            return NULL_PATH_AND_NODE;
+        }
+        return ZKPaths.getPathAndNode(event.getPath());
+    }
+
+    public static List<String> mapToParts(WatchedEvent event)
+    {
+        if ( event.getType() == Watcher.Event.EventType.None )
+        {
+            return Collections.emptyList();
+        }
+        return ZKPaths.split(event.getPath());
+    }
+
+    public static Predicate<WatchedEvent> filterNode(Predicate<String> 
nodeFilter)
+    {
+        return event -> nodeFilter.test(mapToPathAndNode(event).getNode());
+    }
+
+    public static Predicate<WatchedEvent> filterPath(Predicate<String> 
pathFilter)
+    {
+        return event -> pathFilter.test(mapToPathAndNode(event).getPath());
+    }
+
+    public static Predicate<WatchedEvent> filterDepth(IntPredicate depthFilter)
+    {
+        return event -> depthFilter.test(mapToParts(event).size());
+    }
+
+    public static Predicate<WatchedEvent> filterSystemEvents()
+    {
+        return event -> event.getType() == Watcher.Event.EventType.None;
+    }
+
+    public static Predicate<WatchedEvent> filterNonSystemEvents()
+    {
+        return event -> event.getType() != Watcher.Event.EventType.None;
+    }
+
+    private WatcherHelpers()
+    {
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
new file mode 100644
index 0000000..600a7ec
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ * Functional builders to wrap a ZooKeeper Watcher. Call the filter/map/etc. 
methods
+ * as needed and complete by calling {@link 
Next#process(java.util.function.Consumer)} which
+ * returns a Watcher that can be passed to any Curator/ZooKeeper method that 
accepts Watchers.
+ */
+public interface Watchers
+{
+    /**
+     * Filter the chained watched event. If the predicate returns false, the 
chain ends and no further processing
+     * occurs.
+     *
+     * @param filter predicate
+     * @return chain
+     */
+    static Next<WatchedEvent> filter(Predicate<WatchedEvent> filter)
+    {
+        return new WatchersImpl<>(filter);
+    }
+
+    /**
+     * Map the chained watched event to a new object
+     *
+     * @param mapper mapper
+     * @return chain
+     */
+    static <R> Next<R> map(Function<WatchedEvent, ? super R> mapper)
+    {
+        return new WatchersImpl<>(mapper);
+    }
+
+    interface Next<T>
+    {
+        /**
+         * Filter the chained value. If the predicate returns false, the chain 
ends and no further processing
+         * occurs.
+         *
+         * @param filter predicate
+         * @return chain
+         */
+        Next<T> filter(Predicate<? super T> filter);
+
+        /**
+         * Peek at the chained value
+         *
+         * @param consumer consumer
+         * @return chain
+         */
+        Next<T> peek(Consumer<? super T> consumer);
+
+        /**
+         * Map the chained value to a new object
+         *
+         * @param mapper mapper
+         * @return chain
+         */
+        <R> Next<R> map(Function<? super T, ? extends R> mapper);
+
+        /**
+         * Complete the chain and return a new Watcher that consists of all 
the previous steps in the
+         * chain. When the Watcher is called by ZooKeeper, all steps of the 
chain are called in order.
+         *
+         * @param handler consumer for the final value of the chain
+         * @return new Watcher
+         */
+        Watcher process(Consumer<T> handler);
+    }
+}
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
new file mode 100644
index 0000000..ed0e02f
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+public class WatchersImpl<T> implements Watchers.Next<T>
+{
+    private final Step step;
+
+    private static class Step
+    {
+        private final Predicate filter;
+        private final Function mapper;
+
+        private final Step head;
+        private Step next;
+
+        Step(Predicate filter, Function mapper)
+        {
+            this(filter, mapper, null);
+        }
+
+        Step(Predicate filter, Function mapper, Step head)
+        {
+            this.filter = (filter != null) ? filter : (__ -> true);
+            this.mapper = (mapper != null) ? mapper : Function.identity();
+            this.head = (head != null) ? head : this;
+        }
+    }
+
+    WatchersImpl(Predicate<WatchedEvent> filter)
+    {
+        step = new Step(filter, null);
+    }
+
+    <R> WatchersImpl(Function<WatchedEvent, ? super R> mapper)
+    {
+        step = new Step(null, mapper);
+    }
+
+    private WatchersImpl(Step step)
+    {
+        this.step = step;
+    }
+
+    @Override
+    public Watchers.Next<T> filter(Predicate<? super T> filter)
+    {
+        this.step.next = new Step(filter, null, step.head);
+        return new WatchersImpl<>(this.step.next);
+    }
+
+    @Override
+    public Watchers.Next<T> peek(Consumer<? super T> consumer)
+    {
+        return filter(value -> {
+            consumer.accept(value);
+            return true;
+        });
+    }
+
+    @Override
+    public <R> Watchers.Next<R> map(Function<? super T, ? extends R> mapper)
+    {
+        this.step.next = new Step(null, mapper, step.head);
+        return new WatchersImpl<>(this.step.next);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")  // all public APIs are correctly typed so 
this is safe
+    public Watcher process(Consumer<T> handler)
+    {
+        return event -> {
+            Object value = event;
+            Step currentStep = step.head;
+            while ( currentStep != null )
+            {
+                if ( !currentStep.filter.test(value) )
+                {
+                    return; // exit lambda
+                }
+                value = currentStep.mapper.apply(value);
+                currentStep = currentStep.next;
+            }
+
+            ((Consumer)handler).accept(value);
+        };
+    }
+}
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
new file mode 100644
index 0000000..45185b2
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.curator.framework.recipes.watch.WatcherHelpers.*;
+import static org.apache.curator.framework.recipes.watch.Watchers.filter;
+import static org.apache.curator.framework.recipes.watch.Watchers.map;
+
+public class TestWatchers extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testFilterFirst() throws Exception
+    {
+        try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1)) )
+        {
+            client.start();
+
+            Semaphore latch = new Semaphore(0);
+            Watcher watcher = filter(filterNonSystemEvents())
+                .filter(event -> event.getType() == 
Watcher.Event.EventType.NodeCreated)
+                .map(WatcherHelpers::mapToPathAndNode)
+                .filter(z -> z.getPath().equals("/one/two"))
+                .process(__ -> latch.release());
+            
client.checkExists().usingWatcher(watcher).forPath("/one/two/three");
+
+            client.create().forPath("/one");
+            timing.sleepABit();
+            Assert.assertEquals(latch.availablePermits(), 0);
+
+            client.create().forPath("/one/two");
+            timing.sleepABit();
+            Assert.assertEquals(latch.availablePermits(), 0);
+
+            client.create().forPath("/one/two/three");
+            Assert.assertTrue(timing.acquireSemaphore(latch, 1));
+        }
+    }
+
+    @Test
+    public void testMapFirst() throws Exception
+    {
+        try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1)) )
+        {
+            client.start();
+
+            BlockingQueue<String> result = new LinkedBlockingQueue<>();
+            BlockingQueue<String> peekedResult = new LinkedBlockingQueue<>();
+            Watcher watcher = map(event -> "The path is: " + event.getPath())
+                .peek(peekedResult::add)
+                .process(result::add);
+            client.checkExists().usingWatcher(watcher).forPath("/one");
+
+            client.create().forPath("/one");
+            Assert.assertEquals(timing.takeFromQueue(peekedResult), "The path 
is: /one");
+            Assert.assertEquals(timing.takeFromQueue(result), "The path is: 
/one");
+        }
+    }
+
+    @Test
+    public void testSystemEvents() throws Exception
+    {
+        try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1)) )
+        {
+            client.start();
+
+            CountDownLatch latch = new CountDownLatch(1);
+            Watcher watcher = filter(filterSystemEvents())
+                .process(__ -> latch.countDown());
+            client.checkExists().usingWatcher(watcher).forPath("/one");
+            server.stop();
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test
+    public void testHelpers() throws Exception
+    {
+        try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1)) )
+        {
+            client.start();
+
+            BlockingQueue<String> result = new LinkedBlockingQueue<>();
+            Watcher watcher = filter(filterPath(node -> node.startsWith("/a")))
+                .filter(filterNode(node -> node.equals("c")))
+                .filter(filterDepth(depth -> depth > 1))
+                .map(WatcherHelpers::mapToParts)
+                .map(parts -> String.join("|", parts))
+                .process(result::add);
+            client.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+            client.create().creatingParentsIfNeeded().forPath("/a/b/c");
+            Assert.assertEquals(timing.takeFromQueue(result), "a|b|c");
+        }
+    }
+}
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index a239473..b95e92d 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -20,17 +20,18 @@ package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
@@ -41,10 +42,10 @@ import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final CacheInterface cache;
+    private final TreeCache cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
-    private final StandardListenerManager<ModeledCacheListener<T>> 
listenerContainer = StandardListenerManager.standard();
+    private final ListenerContainer<ModeledCacheListener<T>> listenerContainer 
= new ListenerContainer<>();
     private final ZPath basePath;
 
     private static final class Entry<T>
@@ -68,17 +69,30 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = CacheInterface.build(client, basePath.fullPath(), executor, 
modelSpec.createOptions().contains(CreateOption.compress), 
modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded), 
modelSpec.createOptions().contains(CreateOption.createParentsAsContainers));
+        cache = TreeCache.newBuilder(client, basePath.fullPath())
+            .setCacheData(false)
+            
.setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
+            .setExecutor(executor)
+            
.setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded)
 || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
+            .build();
     }
 
     public void start()
     {
-        cache.addListener(this);
-        cache.start();
+        try
+        {
+            cache.getListenable().addListener(this);
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public void close()
     {
+        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
@@ -114,7 +128,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     }
 
-    Listenable<ModeledCacheListener<T>> listenable()
+    public Listenable<ModeledCacheListener<T>> listenable()
     {
         return listenerContainer;
     }
@@ -130,11 +144,14 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
         {
             ThreadUtils.checkInterrupted(e);
 
-            listenerContainer.forEach(l -> l.handleException(e));
+            listenerContainer.forEach(l -> {
+                l.handleException(e);
+                return null;
+            });
         }
     }
 
-    private void internalChildEvent(TreeCacheEvent event)
+    private void internalChildEvent(TreeCacheEvent event) throws Exception
     {
         switch ( event.getType() )
         {
@@ -171,7 +188,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
         case INITIALIZED:
         {
-            listenerContainer.forEach(ModeledCacheListener::initialized);
+            listenerContainer.forEach(l -> {
+                l.initialized();
+                return null;
+            });
             break;
         }
 
@@ -183,6 +203,9 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
 
     private void accept(ModeledCacheListener.Type type, ZPath path, Stat stat, 
T model)
     {
-        listenerContainer.forEach(l -> l.accept(type, path, stat, model));
+        listenerContainer.forEach(l -> {
+            l.accept(type, path, stat, model);
+            return null;
+        });
     }
 }
diff --git a/src/site/confluence/utilities.confluence 
b/src/site/confluence/utilities.confluence
index 2bd7ac1..c44efa8 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -42,6 +42,23 @@ CuratorFramework client = CuratorFrameworkFactory.builder()
 // all connection state listeners set for "client" will get circuit breaking 
behavior
 {code}
 
+h2. Functional Watcher Builder and Helpers
+
+See: 
[[Watchers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java]]
 and
+[[WatcherHelpers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java]]
+
+Especially useful with the new Persistent/Recursive Watcher support, this 
builder and helpers allow creating Watchers from
+functional methods and utilities that chain together. E.g.
+
+{code}
+Watcher watcher = Watchers.filter(WatcherHelpers.filterNonSystemEvents())
+                          .map(WatcherHelpers::mapToPathAndNode)
+                          .process(pathAndNode -> {
+                              ... etc ...
+                          });
+... use the watcher in a Curator method ...
+{code}
+
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using 
Curator locks safer:

Reply via email to