This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher
in repository https://gitbox.apache.org/repos/asf/curator.git

commit ecd06e4a08d2c2a83508cf7257addb8f3c1bcebe
Author: randgalt <[email protected]>
AuthorDate: Wed Oct 2 20:09:32 2019 -0500

    Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
---
 .../curator/utils/DefaultZookeeperFactory.java     |   3 +-
 .../apache/curator/framework/CuratorFramework.java |   6 +
 .../framework/api/AddPersistentWatchBuilder.java   |  21 ++-
 .../framework/api/AddPersistentWatchBuilder2.java  |  16 +-
 .../framework/api/AddPersistentWatchable.java      |  27 ++--
 .../curator/framework/api/CuratorEventType.java    |   7 +-
 .../imps/AddPersistentWatchBuilderImpl.java        | 169 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |   7 +
 .../imps/CuratorMultiTransactionRecord.java        |  34 +++--
 .../framework/imps/ReconfigBuilderImpl.java        |  18 ++-
 .../apache/curator/framework/imps/Watching.java    |  10 +-
 .../curator/framework/imps/TestFramework.java      |  50 ++++++
 .../recipes/leader/ChaosMonkeyCnxnFactory.java     |   7 +-
 curator-test/pom.xml                               |  10 ++
 .../apache/curator/test/TestingQuorumPeerMain.java |   3 +-
 .../apache/curator/test/TestingZooKeeperMain.java  |   5 +-
 .../org/apache/curator/test/WatchersDebug.java     |   9 ++
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |   7 +
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  33 ++++
 .../x/async/details/AsyncCuratorFrameworkImpl.java |   6 +
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 +++++++++
 .../curator/framework/imps/TestFramework.java      |  57 +++++++
 pom.xml                                            |  23 ++-
 23 files changed, 542 insertions(+), 61 deletions(-)

diff --git 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
 
b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
index 42279d0..acd32e7 100644
--- 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ 
b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
@@ -20,12 +20,13 @@ package org.apache.curator.utils;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 
 public class DefaultZookeeperFactory implements ZookeeperFactory
 {
     @Override
     public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, 
Watcher watcher, boolean canBeReadOnly) throws Exception
     {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, 
canBeReadOnly);
+        return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, 
canBeReadOnly);
     }
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 3737faa..7498367 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -199,6 +199,12 @@ public interface CuratorFramework extends Closeable
     public RemoveWatchesBuilder watches();
 
     /**
+     * Start a persistent watch builder
+     *
+     * @return builder object
+     */
+    public AddPersistentWatchBuilder addPersistentWatch();
+    /**
      * Returns the listenable interface for the Connect State
      *
      * @return listenable
diff --git 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
similarity index 65%
copy from 
curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to 
curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
index 42279d0..a167174 100644
--- 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
@@ -16,16 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, 
Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, 
canBeReadOnly);
-    }
-}
+    /**
+     * ZooKeeper persistent watches can optionally be recursive. See
+     * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, 
org.apache.zookeeper.Watcher, boolean)}
+     *
+     * @return this
+     */
+    AddPersistentWatchBuilder2 recursive();
+}
\ No newline at end of file
diff --git 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
similarity index 65%
copy from 
curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to 
curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
index 42279d0..15cea4f 100644
--- 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
@@ -16,16 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddPersistentWatchBuilder2 extends
+    Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
+    AddPersistentWatchable<Pathable<Void>>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, 
Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, 
canBeReadOnly);
-    }
-}
+}
\ No newline at end of file
diff --git 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
similarity index 68%
copy from 
curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
copy to 
curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
index 42279d0..faa8906 100644
--- 
a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
@@ -16,16 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.curator.utils;
+package org.apache.curator.framework.api;
 
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
-public class DefaultZookeeperFactory implements ZookeeperFactory
+public interface AddPersistentWatchable<T>
 {
-    @Override
-    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, 
Watcher watcher, boolean canBeReadOnly) throws Exception
-    {
-        return new ZooKeeper(connectString, sessionTimeout, watcher, 
canBeReadOnly);
-    }
-}
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5dea211..4766ca5 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -96,5 +96,10 @@ public enum CuratorEventType
     /**
      * Event sent when client is being closed
      */
-    CLOSING
+    CLOSING,
+
+    /**
+     * Corresponds to {@link CuratorFramework#addPersistentWatch()}
+     */
+    ADD_PERSISTENT_WATCH
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..acb70c8
--- /dev/null
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddPersistentWatchBuilderImpl implements 
AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching = null;
+    private Backgrounding backgrounding = new Backgrounding();
+    private boolean recursive = false;
+
+    AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching 
watching, Backgrounding backgrounding, boolean recursive)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.recursive = recursive;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchBuilder2 recursive()
+    {
+        recursive = true;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> 
inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> 
inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> 
inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> 
inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, 
path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), 
null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> 
data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = 
client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background");
+            client.getZooKeeper().addPersistentWatch
+                (
+                    fixedPath,
+                    watching.getWatcher(path),
+                    recursive, (rc, path1, ctx) -> {
+                        
trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                        CuratorEvent event = new CuratorEventImpl(client, 
CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, 
null, null, null);
+                        client.processBackgroundOperation(data, event);
+                    },
+                    backgrounding.getContext()
+                );
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = 
client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                client.getZooKeeper().addPersistentWatch(fixedPath, 
watching.getWatcher(path), recursive);
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e003bf0..e6bc1ce 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -571,6 +571,13 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         return new RemoveWatchesBuilderImpl(this);
     }
 
+    @Override
+    public AddPersistentWatchBuilder addPersistentWatch()
+    {
+        Preconditions.checkState(!isZk34CompatibilityMode(), "Persistent 
watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
+        return new AddPersistentWatchBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object 
context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, 
context);
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 3e72609..fbac6e6 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -16,49 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.transaction.OperationType;
 import org.apache.curator.framework.api.transaction.TypeAndPath;
-import org.apache.zookeeper.MultiTransactionRecord;
 import org.apache.zookeeper.Op;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
-class CuratorMultiTransactionRecord extends MultiTransactionRecord
+class CuratorMultiTransactionRecord implements Iterable<Op>
 {
-    private final List<TypeAndPath>     metadata = Lists.newArrayList();
-
-    @Override
-    public final void add(Op op)
-    {
-        throw new UnsupportedOperationException();
-    }
+    private final List<TypeAndPath> metadata = Lists.newArrayList();
+    private List<Op> ops = new ArrayList<>();
 
     void add(Op op, OperationType type, String forPath)
     {
-        super.add(op);
+        ops.add(op);
         metadata.add(new TypeAndPath(type, forPath));
     }
 
-    TypeAndPath     getMetadata(int index)
+    TypeAndPath getMetadata(int index)
     {
         return metadata.get(index);
     }
 
-    int             metadataSize()
+    int metadataSize()
     {
         return metadata.size();
     }
 
     void addToDigest(MessageDigest digest)
     {
-        for ( Op op : this )
+        for ( Op op : ops )
         {
             digest.update(op.getPath().getBytes());
             digest.update(Integer.toString(op.getType()).getBytes());
             digest.update(op.toRequestRecord().toString().getBytes());
         }
     }
+
+    @Override
+    public Iterator<Op> iterator()
+    {
+        return ops.iterator();
+    }
+
+    int size()
+    {
+        return ops.size();
+    }
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 97be59a..f8b78da 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -24,6 +24,8 @@ import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
 import org.apache.curator.framework.api.*;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import java.util.Arrays;
@@ -268,7 +270,7 @@ public class ReconfigBuilderImpl implements 
ReconfigBuilder, BackgroundOperation
                     client.processBackgroundOperation(data, event);
                 }
             };
-            client.getZooKeeper().reconfig(joining, leaving, newMembers, 
fromConfig, callback, backgrounding.getContext());
+            admin().reconfigure(joining, leaving, newMembers, fromConfig, 
callback, backgrounding.getContext());
         }
         catch ( Throwable e )
         {
@@ -276,6 +278,18 @@ public class ReconfigBuilderImpl implements 
ReconfigBuilder, BackgroundOperation
         }
     }
 
+    private ZooKeeperAdmin admin() throws Exception
+    {
+        try
+        {
+            return (ZooKeeperAdmin)client.getZooKeeper();
+        }
+        catch ( ClassCastException e )
+        {
+            throw new Exception("ZooKeeper instance is not an instance of 
ZooKeeperAdmin");
+        }
+    }
+
     private byte[] ensembleInForeground() throws Exception
     {
         TimeTrace trace = 
client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Foreground");
@@ -287,7 +301,7 @@ public class ReconfigBuilderImpl implements 
ReconfigBuilder, BackgroundOperation
                     @Override
                     public byte[] call() throws Exception
                     {
-                        return client.getZooKeeper().reconfig(joining, 
leaving, newMembers, fromConfig, responseStat);
+                        return admin().reconfigure(joining, leaving, 
newMembers, fromConfig, responseStat);
                     }
                 }
             );
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index fe49ad7..5c9dd85 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -1265,4 +1265,54 @@ public class TestFramework extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            
client.addPersistentWatch().recursive().usingWatcher(watcher).forPath("/top/main");
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main/a");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getType(), 
Watcher.Event.EventType.NodeDataChanged);
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testPersistentWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            
client.addPersistentWatch().usingWatcher(watcher).forPath("/top/main");
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 4cb342c..f3dda40 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +93,7 @@ public class ChaosMonkeyCnxnFactory extends 
NIOServerCnxnFactory
                 log.debug("Rejected : " + si.toString());
                 // Still reject request
                 log.debug("Still not ready for " + remaining + "ms");
-                ((NIOServerCnxn)si.cnxn).close();
+                
((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                 return;
             }
             // Submit the request to the legacy Zookeeper server
@@ -113,13 +114,13 @@ public class ChaosMonkeyCnxnFactory extends 
NIOServerCnxnFactory
                         firstError = System.currentTimeMillis();
                         // The znode has been created, close the connection 
and don't tell it to client
                         log.warn("Closing connection right after " + 
createRequest.getPath() + " creation");
-                        ((NIOServerCnxn)si.cnxn).close();
+                        
((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                     }
                 }
                 catch ( Exception e )
                 {
                     // Should not happen
-                    ((NIOServerCnxn)si.cnxn).close();
+                    
((NIOServerCnxn)si.cnxn).close(ServerCnxn.DisconnectReason.UNKNOWN);
                 }
             }
         }
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 3683b7d..4f0c5a2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -41,6 +41,16 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java 
b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
index 3b3ab26..4baaff9 100644
--- 
a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
+++ 
b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.test;
 
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
@@ -39,7 +40,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements 
ZooKeeperMainFace
                 Field               cnxnFactoryField = 
QuorumPeer.class.getDeclaredField("cnxnFactory");
                 cnxnFactoryField.setAccessible(true);
                 ServerCnxnFactory   cnxnFactory = 
(ServerCnxnFactory)cnxnFactoryField.get(quorumPeer);
-                cnxnFactory.closeAll();
+                cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN);
 
                 Field               ssField = 
cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java 
b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index 841df77..8e5ffee 100644
--- 
a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ 
b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@ -23,6 +23,7 @@ import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
 import org.apache.zookeeper.server.ContainerManager;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZKDatabase;
@@ -81,7 +82,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace
         {
             if ( cnxnFactory != null )
             {
-                cnxnFactory.closeAll();
+                cnxnFactory.closeAll(ServerCnxn.DisconnectReason.UNKNOWN);
 
                 Field ssField = cnxnFactory.getClass().getDeclaredField("ss");
                 ssField.setAccessible(true);
@@ -262,7 +263,7 @@ public class TestingZooKeeperMain implements 
ZooKeeperMainFace
     {
         public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
         {
-            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), 
config.getMaxSessionTimeout(), null);
+            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), 
config.getMaxSessionTimeout(), config.getClientPortListenBacklog(), new 
ZKDatabase(txnLog), "");
         }
 
         private final AtomicBoolean isRunning = new AtomicBoolean(false);
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java 
b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
index e4c3b7e..e884b8c 100644
--- a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
+++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
@@ -27,16 +27,19 @@ public class WatchersDebug
     private static final Method getDataWatches;
     private static final Method getExistWatches;
     private static final Method getChildWatches;
+    private static final Method getPersistentWatches;
     static
     {
         Method localGetDataWatches = null;
         Method localGetExistWatches = null;
         Method localGetChildWatches = null;
+        Method localGetPersistentWatches = null;
         try
         {
             localGetDataWatches = getMethod("getDataWatches");
             localGetExistWatches = getMethod("getExistWatches");
             localGetChildWatches = getMethod("getChildWatches");
+            localGetPersistentWatches = getMethod("getPersistentWatches");
         }
         catch ( NoSuchMethodException e )
         {
@@ -45,6 +48,7 @@ public class WatchersDebug
         getDataWatches = localGetDataWatches;
         getExistWatches = localGetExistWatches;
         getChildWatches = localGetChildWatches;
+        getPersistentWatches = localGetPersistentWatches;
     }
 
     public static List<String> getDataWatches(ZooKeeper zooKeeper)
@@ -62,6 +66,11 @@ public class WatchersDebug
         return callMethod(zooKeeper, getChildWatches);
     }
 
+    public static List<String> getPersistentWatches(ZooKeeper zooKeeper)
+    {
+        return callMethod(zooKeeper, getPersistentWatches);
+    }
+
     private WatchersDebug()
     {
     }
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..c1748d0 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends 
WatchableAsyncCuratorFramework
     AsyncReconfigBuilder reconfig();
 
     /**
+     * Start a persistent watch builder
+     *
+     * @return builder object
+     */
+    AsyncPersistentWatchBuilder addPersistentWatch();
+
+    /**
      * Start a transaction builder
      *
      * @return builder object
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
new file mode 100644
index 0000000..0f29233
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -0,0 +1,33 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.api;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.x.async.AsyncStage;
+
+ public interface AsyncPersistentWatchBuilder extends 
AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>
+ {
+     /**
+      * ZooKeeper persistent watches can optionally be recursive. See
+      * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, 
org.apache.zookeeper.Watcher, boolean)}
+      *
+      * @return this
+      */
+     AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive();
+ }
\ No newline at end of file
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..7204513 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements 
AsyncCuratorFramework
     }
 
     @Override
+    public AsyncPersistentWatchBuilder addPersistentWatch()
+    {
+        return new AsyncPersistentWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public AsyncMultiTransaction transaction()
     {
         return operations -> {
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..14f3e30
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
@@ -0,0 +1,75 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.details;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
+ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+ import org.apache.curator.framework.imps.Watching;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.curator.x.async.api.AsyncPathable;
+ import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
+ import org.apache.zookeeper.Watcher;
+
+ import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+ class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, 
AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, 
AsyncPathable<AsyncStage<Void>>
+ {
+     private final CuratorFrameworkImpl client;
+     private final Filters filters;
+     private Watching watching = null;
+     private boolean recursive = false;
+
+     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters 
filters)
+     {
+         this.client = client;
+         this.filters = filters;
+     }
+
+     @Override
+     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
+     {
+         recursive = true;
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher 
watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncStage<Void> forPath(String path)
+     {
+         BuilderCommon<Void> common = new BuilderCommon<>(filters, 
ignoredProc);
+         AddPersistentWatchBuilderImpl builder = new 
AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, 
recursive);
+         return safeCall(common.internalCallback, () -> builder.forPath(path));
+     }
+ }
\ No newline at end of file
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index 27a84d0..10db2d4 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -30,6 +30,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -39,6 +40,8 @@ import org.apache.curator.x.async.api.DeleteOption;
 import org.apache.curator.x.async.api.ExistsOption;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.testng.Assert;
@@ -657,4 +660,58 @@ public class TestFramework extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            
async.addPersistentWatch().recursive().usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get();
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main/a");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getType(), 
Watcher.Event.EventType.NodeDataChanged);
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testPersistentWatch() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+            Watcher watcher = events::add;
+            
async.addPersistentWatch().usingWatcher(watcher).forPath("/top/main").toCompletableFuture().get();
+
+            client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+            client.setData().forPath("/top/main/a", "foo".getBytes());
+            client.setData().forPath("/top/main", "bar".getBytes());
+            Assert.assertEquals(timing.takeFromQueue(events).getPath(), 
"/top/main");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git a/pom.xml b/pom.xml
index 3a5e152..27b34cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
         <jdk-version>1.${short-jdk-version}</jdk-version>
 
         <!-- versions -->
-        <zookeeper-version>3.5.5</zookeeper-version>
+        <zookeeper-version>3.6.0-SNAPSHOT</zookeeper-version>
         <maven-bundle-plugin-version>4.1.0</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
         <doxia-module-confluence-version>1.8</doxia-module-confluence-version>
@@ -85,10 +85,11 @@
         <guava-failureaccess-version>1.0.1</guava-failureaccess-version>
         <testng-version>6.14.3</testng-version>
         <swift-version>0.23.1</swift-version>
-        <dropwizard-version>1.3.7</dropwizard-version>
         <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version>
         <slf4j-version>1.7.25</slf4j-version>
         <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version>
+        <dropwizard-version>3.2.5</dropwizard-version>
+        <snappy-version>1.1.7</snappy-version>
 
         <!-- OSGi Properties -->
         <osgi.export.package />
@@ -567,6 +568,24 @@
                 <artifactId>dropwizard-logging</artifactId>
                 <version>${dropwizard-version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard-version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>${snappy-version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

Reply via email to