This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f5474b1  Downgraded ZK version to 3.4.13
f5474b1 is described below

commit f5474b1380b73819ba9f85b42d006f9ad7e45aa5
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Aug 17 13:04:22 2018 -0700

    Downgraded ZK version to 3.4.13
    
    ### Motivation
    
    BK 4.7 has shipped with newer ZooKeeper client 3.5.3-beta. In many cases, 
there has been concerns regarding having a dependency on a "beta" release. 
Irrespective of the seriousness of these concerns for the specific case of ZK 
which has been in alpha/beta for a very long time, we should not have 
dependency on versions of software for which the team itself has marked as "not 
yet stable".
    
    Adding to that, there is no clear roadmap or ETA for when final 3.5.x 
stable release will be available.
    
    ### Changes
    
     * Downgraded ZK to latest stable version 3.4.13
     * Adjusted few usages of new client APIs that were introduced in 3.5.x
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>
    
    This closes #1601 from merlimat/downgrade-zk
---
 .../benchmark/BenchReadThroughputLatency.java      |  25 ++---
 bookkeeper-dist/src/assemble/bin-all.xml           |   1 +
 bookkeeper-dist/src/assemble/bin-server.xml        |   1 +
 .../src/main/resources/LICENSE-all.bin.txt         |  12 ++-
 .../src/main/resources/LICENSE-server.bin.txt      |  12 ++-
 .../src/main/resources/deps/jline-0.9.94/LICENSE   |  32 ++++++
 .../bookkeeper/discover/ZKRegistrationClient.java  |  17 +--
 .../bookkeeper/zookeeper/ZooKeeperClient.java      | 119 +--------------------
 .../bookkeeper/client/TestBookieWatcher.java       |  16 +--
 .../discover/TestZkRegistrationClient.java         |  28 +----
 .../replication/TestReplicationWorker.java         |   2 +-
 .../bookkeeper/zookeeper/TestZooKeeperClient.java  |  15 +--
 pom.xml                                            |   2 +-
 .../distributedlog/bk/SimpleLedgerAllocator.java   |  35 +++---
 .../apache/distributedlog/zk/ZKWatcherManager.java |  24 -----
 .../bookkeeper/tests/containers/ZKContainer.java   |   9 +-
 .../integration/utils/BookKeeperClusterUtils.java  |  53 ++++-----
 17 files changed, 137 insertions(+), 266 deletions(-)

diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
index 750984f..18c73e8 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -42,7 +43,6 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +55,7 @@ public class BenchReadThroughputLatency {
     private static final Pattern LEDGER_PATTERN = 
Pattern.compile("L([0-9]+)$");
 
     private static final Comparator<String> ZK_LEDGER_COMPARE = new 
Comparator<String>() {
+        @Override
         public int compare(String o1, String o2) {
             try {
                 Matcher m1 = LEDGER_PATTERN.matcher(o1);
@@ -186,28 +187,21 @@ public class BenchReadThroughputLatency {
         }
 
         final CountDownLatch shutdownLatch = new CountDownLatch(1);
-        final CountDownLatch connectedLatch = new CountDownLatch(1);
         final String nodepath = String.format("/ledgers/L%010d", ledger.get());
 
         final ClientConfiguration conf = new ClientConfiguration();
         conf.setReadTimeout(sockTimeout).setZkServers(servers);
 
-        try (ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
-                public void process(WatchedEvent event) {
-                    if (event.getState() == Event.KeeperState.SyncConnected
-                            && event.getType() == Event.EventType.None) {
-                        connectedLatch.countDown();
-                    }
-                }
-        })) {
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
+                .connectString(servers)
+                .sessionTimeoutMs(3000)
+                .build()) {
             final Set<String> processedLedgers = new HashSet<String>();
             zk.register(new Watcher() {
+                    @Override
                     public void process(WatchedEvent event) {
                         try {
-                            if (event.getState() == 
Event.KeeperState.SyncConnected
-                                && event.getType() == Event.EventType.None) {
-                                connectedLatch.countDown();
-                            } else if (event.getType() == 
Event.EventType.NodeCreated
+                            if (event.getType() == Event.EventType.NodeCreated
                                        && event.getPath().equals(nodepath)) {
                                 readLedger(conf, ledger.get(), passwd);
                                 shutdownLatch.countDown();
@@ -233,6 +227,7 @@ public class BenchReadThroughputLatency {
                                             final Long ledgerId = 
Long.valueOf(m.group(1));
                                             processedLedgers.add(ledger);
                                             Thread t = new Thread() {
+                                                @Override
                                                 public void run() {
                                                     readLedger(conf, ledgerId, 
passwd);
                                                 }
@@ -254,7 +249,7 @@ public class BenchReadThroughputLatency {
                         }
                     }
                 });
-            connectedLatch.await();
+
             if (ledger.get() != 0) {
                 if (zk.exists(nodepath, true) != null) {
                     readLedger(conf, ledger.get(), passwd);
diff --git a/bookkeeper-dist/src/assemble/bin-all.xml 
b/bookkeeper-dist/src/assemble/bin-all.xml
index a51d0e0..7b047c4 100644
--- a/bookkeeper-dist/src/assemble/bin-all.xml
+++ b/bookkeeper-dist/src/assemble/bin-all.xml
@@ -61,6 +61,7 @@
         <include>netty-4.1.22.Final/*</include>
         <include>paranamer-2.8/LICENSE.txt</include>
         <include>protobuf-3.0.0/LICENSE</include>
+        <include>jline-0.9.94/LICENSE</include>
         <include>protobuf-3.5.1/LICENSE</include>
         <include>scala-library-2.11.7/LICENSE.md</include>
         <include>scala-parser-combinators_2.11-1.0.4/LICENSE.md</include>
diff --git a/bookkeeper-dist/src/assemble/bin-server.xml 
b/bookkeeper-dist/src/assemble/bin-server.xml
index 761af96..aa7d1b8 100644
--- a/bookkeeper-dist/src/assemble/bin-server.xml
+++ b/bookkeeper-dist/src/assemble/bin-server.xml
@@ -53,6 +53,7 @@
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>netty-4.1.22.Final/*</include>
         <include>protobuf-3.0.0/LICENSE</include>
+        <include>jline-0.9.94/LICENSE</include>
         <include>protobuf-3.5.1/LICENSE</include>
         <include>slf4j-1.7.25/LICENSE.txt</include>
       </includes>
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt 
b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index f1bcc84..5303d52 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -262,7 +262,7 @@ Apache Software License, Version 2.
 - lib/net.java.dev.jna-jna-3.2.7.jar [30]
 - lib/org.apache.commons-commons-collections4-4.1.jar [31]
 - lib/org.apache.commons-commons-lang3-3.6.jar [32]
-- lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [33]
+- lib/org.apache.zookeeper-zookeeper-3.4.13.jar [33]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [34]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [34]
 - lib/org.eclipse.jetty-jetty-security-9.4.5.v20170502.jar [34]
@@ -298,6 +298,7 @@ Apache Software License, Version 2.
 - lib/org.apache.curator-curator-recipes-4.0.1.jar [47]
 - lib/org.inferred-freebuilder-1.14.9.jar [48]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [49]
+- lib/org.apache.yetus-audience-annotations-0.5.0.jar [50]
 
 [1] Source available at 
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at 
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -330,7 +331,7 @@ Apache Software License, Version 2.
 [30] Source available at https://github.com/java-native-access/jna/tree/3.2.7
 [31] Source available at 
https://git-wip-us.apache.org/repos/asf?p=commons-collections.git;a=tag;h=a3a5ad
 [32] Source available at 
https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=tag;h=3ad2e8
-[33] Source available at https://github.com/apache/zookeeper/tree/release-3.5.3
+[33] Source available at 
https://github.com/apache/zookeeper/tree/release-3.4.13
 [34] Source available at 
https://github.com/eclipse/jetty.project/tree/jetty-9.4.5.v20170502
 [35] Source available at https://github.com/facebook/rocksdb/tree/v5.13.1
 [36] Source available at 
https://github.com/cbeust/jcommander/tree/jcommander-1.48
@@ -347,6 +348,7 @@ Apache Software License, Version 2.
 [47] Source available at 
https://github.com/apache/curator/tree/apache-curator-4.0.1
 [48] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [49] Source available at https://github.com/google/error-prone/tree/v2.1.2
+[50] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
 
 
 
------------------------------------------------------------------------------------
@@ -547,4 +549,10 @@ license. For details, see 
deps/google-auth-library-credentials-0.9.0/LICENSE
 Bundled as
   - lib/com.google.auth-google-auth-library-credentials-0.9.0.jar
 Source available at 
https://github.com/google/google-auth-library-java/tree/0.9.0
+------------------------------------------------------------------------------------
+This product bundles the JLine Library, which is available under a "2-clause 
BSD"
+license. For details, see deps/jline-0.9.94/LICENSE
+
+Bundled as
+  - lib/jline-jline-0.9.94.jar
 
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt 
b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index d59d322..eac5714 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -227,7 +227,7 @@ Apache Software License, Version 2.
 - lib/net.java.dev.jna-jna-3.2.7.jar [17]
 - lib/org.apache.commons-commons-collections4-4.1.jar [18]
 - lib/org.apache.commons-commons-lang3-3.6.jar [19]
-- lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [20]
+- lib/org.apache.zookeeper-zookeeper-3.4.13.jar [20]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [21]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [21]
 - lib/org.eclipse.jetty-jetty-security-9.4.5.v20170502.jar [21]
@@ -263,6 +263,7 @@ Apache Software License, Version 2.
 - lib/org.apache.curator-curator-recipes-4.0.1.jar [34]
 - lib/org.inferred-freebuilder-1.14.9.jar [35]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [36]
+- lib/org.apache.yetus-audience-annotations-0.5.0.jar [37]
 
 [1] Source available at 
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at 
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -283,7 +284,7 @@ Apache Software License, Version 2.
 [17] Source available at https://github.com/java-native-access/jna/tree/3.2.7
 [18] Source available at 
https://git-wip-us.apache.org/repos/asf?p=commons-collections.git;a=tag;h=a3a5ad
 [19] Source available at 
https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=tag;h=3ad2e8
-[20] Source available at https://github.com/apache/zookeeper/tree/release-3.5.3
+[20] Source available at 
https://github.com/apache/zookeeper/tree/release-3.4.13
 [21] Source available at 
https://github.com/eclipse/jetty.project/tree/jetty-9.4.5.v20170502
 [22] Source available at https://github.com/facebook/rocksdb/tree/v5.13.1
 [23] Source available at 
https://github.com/cbeust/jcommander/tree/jcommander-1.48
@@ -300,6 +301,7 @@ Apache Software License, Version 2.
 [34] Source available at 
https://github.com/apache/curator/tree/apache-curator-4.0.1
 [35] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [36] Source available at https://github.com/google/error-prone/tree/v2.1.2
+[37] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
 
 
------------------------------------------------------------------------------------
 lib/io.netty-netty-all-4.1.22.Final.jar bundles some 3rd party dependencies
@@ -435,3 +437,9 @@ license. For details, see 
deps/google-auth-library-credentials-0.9.0/LICENSE
 Bundled as
   - lib/com.google.auth-google-auth-library-credentials-0.9.0.jar
 Source available at 
https://github.com/google/google-auth-library-java/tree/0.9.0
+------------------------------------------------------------------------------------
+This product bundles the JLine Library, which is available under a "2-clause 
BSD"
+license. For details, see deps/jline-0.9.94/LICENSE
+
+Bundled as
+  - lib/jline-jline-0.9.94.jar
diff --git a/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE 
b/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE
new file mode 100644
index 0000000..246f54f
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE
@@ -0,0 +1,32 @@
+Copyright (c) 2002-2006, Marc Prud'hommeaux <[email protected]>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index ff48a08..ce19751 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -66,7 +66,7 @@ public class ZKRegistrationClient implements 
RegistrationClient {
 
         private final String regPath;
         private final Set<RegistrationListener> listeners;
-        private boolean closed = false;
+        private volatile boolean closed = false;
         private Set<BookieSocketAddress> bookies = null;
         private Version version = Version.NEW;
         private final CompletableFuture<Void> firstRunFuture;
@@ -154,23 +154,12 @@ public class ZKRegistrationClient implements 
RegistrationClient {
             scheduleWatchTask(0L);
         }
 
-        synchronized boolean isClosed() {
+        boolean isClosed() {
             return closed;
         }
 
         @Override
-        public synchronized void close() {
-            if (closed) {
-                return;
-            }
-            zk.removeWatches(
-                regPath,
-                this,
-                WatcherType.Children,
-                true,
-                (rc, path, ctx) -> {},
-                null
-            );
+        public void close() {
             closed = true;
         }
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
index 24693ea..be037f5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
@@ -44,7 +44,6 @@ import org.apache.bookkeeper.zookeeper.ZooWorker.ZooCallable;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
@@ -68,7 +67,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Provide a zookeeper client to handle session expire.
  */
-public class ZooKeeperClient extends ZooKeeper implements Watcher {
+public class ZooKeeperClient extends ZooKeeper implements Watcher, 
AutoCloseable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ZooKeeperClient.class);
 
@@ -761,74 +760,6 @@ public class ZooKeeperClient extends ZooKeeper implements 
Watcher {
     }
 
     @Override
-    public String create(final String path,
-                         final byte[] data,
-                         final List<ACL> acl,
-                         final CreateMode createMode,
-                         final Stat stat)
-            throws KeeperException, InterruptedException {
-        return ZooWorker.syncCallWithRetries(this, new ZooCallable<String>() {
-
-            @Override
-            public String call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return ZooKeeperClient.super.create(path, data, acl, 
createMode);
-                }
-                return zkHandle.create(path, data, acl, createMode);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, 
acl, createMode);
-            }
-
-        }, operationRetryPolicy, rateLimiter, createStats);
-    }
-
-    @Override
-    public void create(final String path,
-                       final byte[] data,
-                       final List<ACL> acl,
-                       final CreateMode createMode,
-                       final Create2Callback cb,
-                       final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, createStats) {
-
-            final Create2Callback createCb = new Create2Callback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, 
String name, Stat stat) {
-                    ZooWorker worker = (ZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, name, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    ZooKeeperClient.super.create(path, data, acl, createMode, 
createCb, worker);
-                } else {
-                    zkHandle.create(path, data, acl, createMode, createCb, 
worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, 
acl, createMode);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
     public void delete(final String path, final int version) throws 
KeeperException, InterruptedException {
         ZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
 
@@ -1427,52 +1358,4 @@ public class ZooKeeperClient extends ZooKeeper 
implements Watcher {
         proc.run();
     }
 
-    @Override
-    public void removeWatches(String path, Watcher watcher, WatcherType 
watcherType, boolean local)
-            throws InterruptedException, KeeperException {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeWatches(path, watcher, watcherType, 
local);
-        } else {
-            zkHandle.removeWatches(path, watcher, watcherType, local);
-        }
-    }
-
-    @Override
-    public void removeWatches(String path,
-                              Watcher watcher,
-                              WatcherType watcherType,
-                              boolean local,
-                              VoidCallback cb,
-                              Object ctx) {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeWatches(path, watcher, watcherType, 
local, cb, ctx);
-        } else {
-            zkHandle.removeWatches(path, watcher, watcherType, local, cb, ctx);
-        }
-    }
-
-    @Override
-    public void removeAllWatches(String path, WatcherType watcherType, boolean 
local)
-            throws InterruptedException, KeeperException {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeAllWatches(path, watcherType, local);
-        } else {
-            zkHandle.removeAllWatches(path, watcherType, local);
-        }
-    }
-
-    @Override
-    public void removeAllWatches(String path, WatcherType watcherType, boolean 
local, VoidCallback cb, Object ctx) {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeAllWatches(path, watcherType, local, 
cb, ctx);
-        } else {
-            zkHandle.removeAllWatches(path, watcherType, local, cb, ctx);
-        }
-    }
-
-
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
index e8e82b7..63de59c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Cleanup;
+
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -69,7 +71,7 @@ public class TestBookieWatcher extends 
BookKeeperClusterTestCase {
     @Test
     public void testBookieWatcherSurviveWhenSessionExpired() throws Exception {
         final int timeout = 2000;
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .sessionTimeoutMs(timeout)
                 .build()) {
@@ -81,7 +83,9 @@ public class TestBookieWatcher extends 
BookKeeperClusterTestCase {
     public void testBookieWatcherDieWhenSessionExpired() throws Exception {
         final int timeout = 2000;
         final CountDownLatch connectLatch = new CountDownLatch(1);
-        try (ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 
timeout, new Watcher() {
+
+        @Cleanup
+        ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 
timeout, new Watcher() {
             @Override
             public void process(WatchedEvent watchedEvent) {
                 if (EventType.None == watchedEvent.getType()
@@ -89,10 +93,10 @@ public class TestBookieWatcher extends 
BookKeeperClusterTestCase {
                     connectLatch.countDown();
                 }
             }
-        })) {
-            connectLatch.await();
-            runBookieWatcherWhenSessionExpired(zk, timeout, false);
-        }
+        });
+
+        connectLatch.await();
+        runBookieWatcherWhenSessionExpired(zk, timeout, false);
     }
 
     private void runBookieWatcherWhenSessionExpired(ZooKeeper zk, int timeout, 
boolean reconnectable)
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
index 99336b4..1f5e0b6 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
@@ -33,8 +33,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -42,6 +40,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
+
 import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
@@ -49,7 +48,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.client.BKException.ZKException;
 import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
@@ -60,12 +61,10 @@ import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
@@ -94,6 +93,7 @@ public class TestZkRegistrationClient extends 
MockZooKeeperTestCase {
     private ScheduledExecutorService mockExecutor;
     private MockExecutorController controller;
 
+    @Override
     @Before
     public void setup() throws Exception {
         super.setup();
@@ -372,17 +372,6 @@ public class TestZkRegistrationClient extends 
MockZooKeeperTestCase {
             zkRegistrationClient.unwatchReadOnlyBookies(secondListener);
             assertEquals(1, 
zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners());
         }
-        // the watch task will not be closed since there is still a listener
-        verify(mockZk, times(0))
-            .removeWatches(
-                eq(isWritable ? regPath : regReadonlyPath),
-                same(isWritable ? 
zkRegistrationClient.getWatchWritableBookiesTask()
-                    : zkRegistrationClient.getWatchReadOnlyBookiesTask()),
-                eq(WatcherType.Children),
-                eq(true),
-                any(VoidCallback.class),
-                any()
-            );
 
         // trigger watcher
         notifyWatchedEvent(
@@ -421,15 +410,6 @@ public class TestZkRegistrationClient extends 
MockZooKeeperTestCase {
         }
         // the watch task will not be closed since there is still a listener
         assertTrue(expectedWatcher.isClosed());
-        verify(mockZk, times(1))
-            .removeWatches(
-                eq(isWritable ? regPath : regReadonlyPath),
-                same(expectedWatcher),
-                eq(WatcherType.Children),
-                eq(true),
-                any(VoidCallback.class),
-                any()
-            );
     }
 
     @Test
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index f64456b..e4b1232 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -655,7 +655,7 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testRWZKConnectionLost() throws Exception {
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .sessionTimeoutMs(10000)
                 .build()) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
index 8e88418..bb9554a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -226,7 +225,7 @@ public class TestZooKeeperClient extends TestCase {
 
         expireZooKeeperSession(client, timeout);
         logger.info("Create children under znode " + path);
-        client.create(path + "/children2", data, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT, new Stat());
+        client.create(path + "/children2", data, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
 
         expireZooKeeperSession(client, timeout);
         List<String> children = client.getChildren(path, false, newStat);
@@ -791,16 +790,10 @@ public class TestZooKeeperClient extends TestCase {
         expireZooKeeperSession(client, timeout);
         logger.info("Create znode " + path);
         final CountDownLatch create2Latch = new CountDownLatch(1);
-        client.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
-                new Create2Callback() {
-
-            @Override
-            public void processResult(int rc, String path, Object ctx, String 
name, Stat stat) {
-                if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                    create2Latch.countDown();
-                }
+        client.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, 
(rc, path1, ctx, name) -> {
+            if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                create2Latch.countDown();
             }
-
         }, null);
         create2Latch.await();
         logger.info("Created znode " + path);
diff --git a/pom.xml b/pom.xml
index 471cec5..ee2b745 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,7 @@
     <testcontainers.version>1.7.0</testcontainers.version>
     <twitter-server.version>1.29.0</twitter-server.version>
     <vertx.version>3.4.1</vertx.version>
-    <zookeeper.version>3.5.3-beta</zookeeper.version>
+    <zookeeper.version>3.4.13</zookeeper.version>
     <!-- plugin dependencies -->
     <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
     <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index 367a97d..eb03923 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -18,12 +18,14 @@
 package org.apache.distributedlog.bk;
 
 import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
+
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -41,7 +43,6 @@ import org.apache.distributedlog.zk.ZKTransaction;
 import org.apache.distributedlog.zk.ZKVersionedSetOp;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,21 +130,19 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
             final CompletableFuture<Versioned<byte[]>> promise = new 
CompletableFuture<Versioned<byte[]>>();
             zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES,
                     zkc.getDefaultACL(), CreateMode.PERSISTENT,
-                    new org.apache.zookeeper.AsyncCallback.Create2Callback() {
-                        @Override
-                        public void processResult(int rc, String path, Object 
ctx, String name, Stat stat) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.complete(new 
Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
-                                        new LongVersion(stat.getVersion())));
-                            } else if 
(KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                                FutureUtils.proxyTo(
-                                  Utils.zkGetData(zkc, allocatePath, false),
-                                  promise
-                                );
-                            } else {
-                                
promise.completeExceptionally(Utils.zkException(
-                                        
KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
-                            }
+                    (rc, path, ctx, name) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            // Since the z-node was just created, we are sure 
at this point the version is 0
+                            promise.complete(new 
Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
+                                    new LongVersion(0)));
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() 
== rc) {
+                            FutureUtils.proxyTo(
+                              Utils.zkGetData(zkc, allocatePath, false),
+                              promise
+                            );
+                        } else {
+                            promise.completeExceptionally(Utils.zkException(
+                                    
KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
                         }
                     }, null);
             return promise;
@@ -489,7 +488,7 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
                     @Override
                     public void onFailure(Throwable cause) {
                         LOG.debug("Fail to obtain the allocated ledger handle 
when closing the allocator : ", cause);
-                        closePromise.complete(null);
+                        FutureUtils.complete(closePromise, null);
                     }
                 });
             }
@@ -497,7 +496,7 @@ public class SimpleLedgerAllocator implements 
LedgerAllocator, FutureEventListen
             @Override
             public void onFailure(Throwable cause) {
                 LOG.debug("Fail to obtain the allocated ledger handle when 
closing the allocator : ", cause);
-                closePromise.complete(null);
+                FutureUtils.complete(closePromise, null);
             }
         });
 
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
index 8c350c9..cbdae1d 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -165,28 +163,6 @@ public class ZKWatcherManager implements Watcher {
                 logger.warn("Remove a non-registered child watcher {} from 
path {}", watcher, path);
             }
             if (watchers.isEmpty()) {
-                // best-efforts to remove watches
-                try {
-                    if (null != zkc && removeFromServer) {
-                        zkc.get().removeWatches(path, this, 
WatcherType.Children, true,
-                                new AsyncCallback.VoidCallback() {
-                            @Override
-                            public void processResult(int rc, String path, 
Object ctx) {
-                                if (KeeperException.Code.OK.intValue() == rc) {
-                                    logger.debug("Successfully removed 
children watches from {}", path);
-                                } else {
-                                    logger.debug("Encountered exception on 
removing children watches from {}",
-                                            path, 
KeeperException.create(KeeperException.Code.get(rc)));
-                                }
-                            }
-                        }, null);
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    logger.debug("Encountered exception on removing watches 
from {}", path, e);
-                } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-                    logger.debug("Encountered exception on removing watches 
from {}", path, e);
-                }
                 childWatches.remove(path, watchers);
             }
         }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
index 4ef6a78..fd9e4a7 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
@@ -22,7 +22,7 @@ import static java.time.temporal.ChronoUnit.SECONDS;
 
 import java.time.Duration;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+import org.apache.bookkeeper.tests.containers.wait.ZKWaitStrategy;
 
 /**
  * Test container that runs zookeeper.
@@ -62,11 +62,8 @@ public class ZKContainer<SelfT extends ZKContainer<SelfT>> 
extends MetadataStore
 
     @Override
     public void start() {
-        this.waitStrategy = new HttpWaitStrategy()
-            .forPath("/commands/ruok")
-            .forStatusCode(200)
-            .forPort(ZK_HTTP_PORT)
-            .withStartupTimeout(Duration.of(60, SECONDS));
+        this.waitStrategy = new ZKWaitStrategy(ZK_PORT)
+                .withStartupTimeout(Duration.of(60, SECONDS));
 
         this.withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd.withHostName(HOST_NAME));
 
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
index b0f4e00..8d02596 100644
--- 
a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
@@ -28,6 +28,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import lombok.Cleanup;
+
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -79,21 +81,21 @@ public class BookKeeperClusterUtils {
     }
 
     public static void legacyMetadataFormat(DockerClient docker) throws 
Exception {
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-        }
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
     }
 
     public static boolean metadataFormatIfNeeded(DockerClient docker, String 
version) throws Exception {
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            if (zk.exists("/ledgers", false) == null) {
-                String bookkeeper = "/opt/bookkeeper/" + version + 
"/bin/bookkeeper";
-                runOnAnyBookie(docker, bookkeeper, "shell", "metaformat", 
"-nonInteractive");
-                return true;
-            } else {
-                return false;
-            }
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        if (zk.exists("/ledgers", false) == null) {
+            String bookkeeper = "/opt/bookkeeper/" + version + 
"/bin/bookkeeper";
+            runOnAnyBookie(docker, bookkeeper, "shell", "metaformat", 
"-nonInteractive");
+            return true;
+        } else {
+            return false;
         }
     }
 
@@ -102,17 +104,18 @@ public class BookKeeperClusterUtils {
                                                      String namespace) throws 
Exception {
         String zkServers = 
BookKeeperClusterUtils.zookeeperConnectString(docker);
         String dlogUri = "distributedlog://" + zkServers + namespace;
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            if (zk.exists(namespace, false) == null) {
-                String dlog = "/opt/bookkeeper/" + version + "/bin/dlog";
-
-                runOnAnyBookie(docker, dlog,
-                    "admin",
-                    "bind",
-                    "-l", "/ledgers",
-                    "-s", zkServers,
-                    "-c", dlogUri);
-            }
+
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        if (zk.exists(namespace, false) == null) {
+            String dlog = "/opt/bookkeeper/" + version + "/bin/dlog";
+
+            runOnAnyBookie(docker, dlog,
+                "admin",
+                "bind",
+                "-l", "/ledgers",
+                "-s", zkServers,
+                "-c", dlogUri);
         }
         return dlogUri;
     }
@@ -169,7 +172,9 @@ public class BookKeeperClusterUtils {
         long timeoutMillis = timeoutUnit.toMillis(timeout);
         long pollMillis = 1000;
         String bookieId = DockerUtils.getContainerIP(docker, containerId) + 
":3181";
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
+        try {
+            @Cleanup
+            ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
             String path = "/ledgers/available/" + bookieId;
             while (timeoutMillis > 0) {
                 if ((zk.exists(path, false) != null) == upOrDown) {

Reply via email to