This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch persistent-watcher-functional in repository https://gitbox.apache.org/repos/asf/curator.git
commit 46dab87492affb994c9be08655862a2c40c31c97 Author: randgalt <[email protected]> AuthorDate: Mon Oct 14 15:01:01 2019 +0300 Added some functional utilities that will aid using persistent/recursive watchers --- .../framework/recipes/watch/WatcherHelpers.java | 79 +++++++++++++ .../curator/framework/recipes/watch/Watchers.java | 93 +++++++++++++++ .../framework/recipes/watch/WatchersImpl.java | 110 ++++++++++++++++++ .../framework/recipes/watch/TestWatchers.java | 125 +++++++++++++++++++++ src/site/confluence/utilities.confluence | 17 +++ 5 files changed, 424 insertions(+) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java new file mode 100644 index 0000000..4db368a --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import java.util.Collections; +import java.util.List; +import java.util.function.IntPredicate; +import java.util.function.Predicate; + +public class WatcherHelpers +{ + private static final ZKPaths.PathAndNode NULL_PATH_AND_NODE = new ZKPaths.PathAndNode("/", ""); + + public static ZKPaths.PathAndNode mapToPathAndNode(WatchedEvent event) + { + if ( event.getType() == Watcher.Event.EventType.None ) + { + return NULL_PATH_AND_NODE; + } + return ZKPaths.getPathAndNode(event.getPath()); + } + + public static List<String> mapToParts(WatchedEvent event) + { + if ( event.getType() == Watcher.Event.EventType.None ) + { + return Collections.emptyList(); + } + return ZKPaths.split(event.getPath()); + } + + public static Predicate<WatchedEvent> filterNode(Predicate<String> nodeFilter) + { + return event -> nodeFilter.test(mapToPathAndNode(event).getNode()); + } + + public static Predicate<WatchedEvent> filterPath(Predicate<String> pathFilter) + { + return event -> pathFilter.test(mapToPathAndNode(event).getPath()); + } + + public static Predicate<WatchedEvent> filterDepth(IntPredicate depthFilter) + { + return event -> depthFilter.test(mapToParts(event).size()); + } + + public static Predicate<WatchedEvent> filterSystemEvents() + { + return event -> event.getType() == Watcher.Event.EventType.None; + } + + public static Predicate<WatchedEvent> filterNonSystemEvents() + { + return event -> event.getType() != Watcher.Event.EventType.None; + } + + private WatcherHelpers() + { + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java new file mode 100644 index 0000000..600a7ec --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +/** + * Functional builders to wrap a ZooKeeper Watcher. Call the filter/map/etc. methods + * as needed and complete by calling {@link Next#process(java.util.function.Consumer)} which + * returns a Watcher that can be passed to any Curator/ZooKeeper method that accepts Watchers. + */ +public interface Watchers +{ + /** + * Filter the chained watched event. If the predicate returns false, the chain ends and no further processing + * occurs. + * + * @param filter predicate + * @return chain + */ + static Next<WatchedEvent> filter(Predicate<WatchedEvent> filter) + { + return new WatchersImpl<>(filter); + } + + /** + * Map the chained watched event to a new object + * + * @param mapper mapper + * @return chain + */ + static <R> Next<R> map(Function<WatchedEvent, ? super R> mapper) + { + return new WatchersImpl<>(mapper); + } + + interface Next<T> + { + /** + * Filter the chained value. If the predicate returns false, the chain ends and no further processing + * occurs. + * + * @param filter predicate + * @return chain + */ + Next<T> filter(Predicate<? super T> filter); + + /** + * Peek at the chained value + * + * @param consumer consumer + * @return chain + */ + Next<T> peek(Consumer<? super T> consumer); + + /** + * Map the chained value to a new object + * + * @param mapper mapper + * @return chain + */ + <R> Next<R> map(Function<? super T, ? extends R> mapper); + + /** + * Complete the chain and return a new Watcher that consists of all the previous steps in the + * chain. When the Watcher is called by ZooKeeper, all steps of the chain are called in order. + * + * @param handler consumer for the final value of the chain + * @return new Watcher + */ + Watcher process(Consumer<T> handler); + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java new file mode 100644 index 0000000..ed0e02f --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +public class WatchersImpl<T> implements Watchers.Next<T> +{ + private final Step step; + + private static class Step + { + private final Predicate filter; + private final Function mapper; + + private final Step head; + private Step next; + + Step(Predicate filter, Function mapper) + { + this(filter, mapper, null); + } + + Step(Predicate filter, Function mapper, Step head) + { + this.filter = (filter != null) ? filter : (__ -> true); + this.mapper = (mapper != null) ? mapper : Function.identity(); + this.head = (head != null) ? head : this; + } + } + + WatchersImpl(Predicate<WatchedEvent> filter) + { + step = new Step(filter, null); + } + + <R> WatchersImpl(Function<WatchedEvent, ? super R> mapper) + { + step = new Step(null, mapper); + } + + private WatchersImpl(Step step) + { + this.step = step; + } + + @Override + public Watchers.Next<T> filter(Predicate<? super T> filter) + { + this.step.next = new Step(filter, null, step.head); + return new WatchersImpl<>(this.step.next); + } + + @Override + public Watchers.Next<T> peek(Consumer<? super T> consumer) + { + return filter(value -> { + consumer.accept(value); + return true; + }); + } + + @Override + public <R> Watchers.Next<R> map(Function<? super T, ? extends R> mapper) + { + this.step.next = new Step(null, mapper, step.head); + return new WatchersImpl<>(this.step.next); + } + + @Override + @SuppressWarnings("unchecked") // all public APIs are correctly typed so this is safe + public Watcher process(Consumer<T> handler) + { + return event -> { + Object value = event; + Step currentStep = step.head; + while ( currentStep != null ) + { + if ( !currentStep.filter.test(value) ) + { + return; // exit lambda + } + value = currentStep.mapper.apply(value); + currentStep = currentStep.next; + } + + ((Consumer)handler).accept(value); + }; + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java new file mode 100644 index 0000000..45185b2 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.watch; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.compatibility.Timing2; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +import static org.apache.curator.framework.recipes.watch.WatcherHelpers.*; +import static org.apache.curator.framework.recipes.watch.Watchers.filter; +import static org.apache.curator.framework.recipes.watch.Watchers.map; + +public class TestWatchers extends BaseClassForTests +{ + private final Timing2 timing = new Timing2(); + + @Test + public void testFilterFirst() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + Semaphore latch = new Semaphore(0); + Watcher watcher = filter(filterNonSystemEvents()) + .filter(event -> event.getType() == Watcher.Event.EventType.NodeCreated) + .map(WatcherHelpers::mapToPathAndNode) + .filter(z -> z.getPath().equals("/one/two")) + .process(__ -> latch.release()); + client.checkExists().usingWatcher(watcher).forPath("/one/two/three"); + + client.create().forPath("/one"); + timing.sleepABit(); + Assert.assertEquals(latch.availablePermits(), 0); + + client.create().forPath("/one/two"); + timing.sleepABit(); + Assert.assertEquals(latch.availablePermits(), 0); + + client.create().forPath("/one/two/three"); + Assert.assertTrue(timing.acquireSemaphore(latch, 1)); + } + } + + @Test + public void testMapFirst() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + BlockingQueue<String> result = new LinkedBlockingQueue<>(); + BlockingQueue<String> peekedResult = new LinkedBlockingQueue<>(); + Watcher watcher = map(event -> "The path is: " + event.getPath()) + .peek(peekedResult::add) + .process(result::add); + client.checkExists().usingWatcher(watcher).forPath("/one"); + + client.create().forPath("/one"); + Assert.assertEquals(timing.takeFromQueue(peekedResult), "The path is: /one"); + Assert.assertEquals(timing.takeFromQueue(result), "The path is: /one"); + } + } + + @Test + public void testSystemEvents() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + CountDownLatch latch = new CountDownLatch(1); + Watcher watcher = filter(filterSystemEvents()) + .process(__ -> latch.countDown()); + client.checkExists().usingWatcher(watcher).forPath("/one"); + server.stop(); + Assert.assertTrue(timing.awaitLatch(latch)); + } + } + + @Test + public void testHelpers() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + BlockingQueue<String> result = new LinkedBlockingQueue<>(); + Watcher watcher = filter(filterPath(node -> node.startsWith("/a"))) + .filter(filterNode(node -> node.equals("c"))) + .filter(filterDepth(depth -> depth > 1)) + .map(WatcherHelpers::mapToParts) + .map(parts -> String.join("|", parts)) + .process(result::add); + client.checkExists().usingWatcher(watcher).forPath("/a/b/c"); + client.create().creatingParentsIfNeeded().forPath("/a/b/c"); + Assert.assertEquals(timing.takeFromQueue(result), "a|b|c"); + } + } +} diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence index 2bd7ac1..c44efa8 100644 --- a/src/site/confluence/utilities.confluence +++ b/src/site/confluence/utilities.confluence @@ -42,6 +42,23 @@ CuratorFramework client = CuratorFrameworkFactory.builder() // all connection state listeners set for "client" will get circuit breaking behavior {code} +h2. Functional Watcher Builder and Helpers + +See: [[Watchers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java]] and +[[WatcherHelpers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java]] + +Especially useful with the new Persistent/Recursive Watcher support, this builder and helpers allow creating Watchers from +functional methods and utilities that chain together. E.g. + +{code} +Watcher watcher = Watchers.filter(WatcherHelpers.filterNonSystemEvents()) + .map(WatcherHelpers::mapToPathAndNode) + .process(pathAndNode -> { + ... etc ... + }); +... use the watcher in a Curator method ... +{code} + h2. Locker Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer:
