This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new a9860694c36 [fix][meta] Bookie Info lost by notification race 
condition. (#20642)
a9860694c36 is described below

commit a9860694c364c2b2d8b1daadee621e4ff87a082b
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Jun 30 09:21:15 2023 +0800

    [fix][meta] Bookie Info lost by notification race condition. (#20642)
---
 .../bookkeeper/PulsarRegistrationClient.java       | 221 +++++---
 .../FaultInjectableZKRegistrationManager.java      | 630 +++++++++++++++++++++
 .../bookkeeper/PulsarRegistrationClientTest.java   | 122 +++-
 3 files changed, 872 insertions(+), 101 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index c5178cd3e24..058d8dfe436 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pulsar.metadata.bookkeeper;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import static org.apache.pulsar.common.util.FutureUtil.Sequencer;
+import static org.apache.pulsar.common.util.FutureUtil.waitForAll;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -42,10 +47,10 @@ import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.CacheGetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Notification;
-import org.apache.pulsar.metadata.api.NotificationType;
 
 @Slf4j
 public class PulsarRegistrationClient implements RegistrationClient {
@@ -56,20 +61,22 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
     private final String bookieRegistrationPath;
     private final String bookieAllRegistrationPath;
     private final String bookieReadonlyRegistrationPath;
-
-    private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> 
bookieServiceInfoCache =
-                                                                               
     new ConcurrentHashMap();
     private final Set<RegistrationListener> writableBookiesWatchers = new 
CopyOnWriteArraySet<>();
     private final Set<RegistrationListener> readOnlyBookiesWatchers = new 
CopyOnWriteArraySet<>();
     private final MetadataCache<BookieServiceInfo> 
bookieServiceInfoMetadataCache;
     private final ScheduledExecutorService executor;
+    private final Map<BookieId, Versioned<BookieServiceInfo>> 
writableBookieInfo;
+    private final Map<BookieId, Versioned<BookieServiceInfo>> 
readOnlyBookieInfo;
+    private final FutureUtil.Sequencer<Void> sequencer;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
         this.store = store;
         this.ledgersRootPath = ledgersRootPath;
         this.bookieServiceInfoMetadataCache = 
store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
-
+        this.sequencer = Sequencer.create();
+        this.writableBookieInfo = new ConcurrentHashMap<>();
+        this.readOnlyBookieInfo = new ConcurrentHashMap<>();
         // Following Bookie Network Address Changes is an expensive operation
         // as it requires additional ZooKeeper watches
         // we can disable this feature, in case the BK cluster has only
@@ -77,7 +84,6 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
         this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
         this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + 
"/" + READONLY;
-
         this.executor = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-registration-client"));
 
@@ -91,38 +97,62 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
-        return getChildren(bookieRegistrationPath);
+        return getBookiesThenFreshCache(bookieRegistrationPath);
     }
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
         // this method is meant to return all the known bookies, even the 
bookies
         // that are not in a running state
-        return getChildren(bookieAllRegistrationPath);
+        return getBookiesThenFreshCache(bookieAllRegistrationPath);
     }
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
-        return getChildren(bookieReadonlyRegistrationPath);
+        return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
     }
 
-    private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String 
path) {
+    /**
+     * @throws IllegalArgumentException if parameter path is null or empty.
+     */
+    private CompletableFuture<Versioned<Set<BookieId>>> 
getBookiesThenFreshCache(String path) {
+        if (path == null || path.isEmpty()) {
+            return failedFuture(
+                    new IllegalArgumentException("parameter [path] can not be 
null or empty."));
+        }
         return store.getChildren(path)
                 .thenComposeAsync(children -> {
-                    Set<BookieId> bookieIds = 
PulsarRegistrationClient.convertToBookieAddresses(children);
-                    List<CompletableFuture<?>> bookieInfoUpdated =
-                            new ArrayList<>(bookieIds.size());
+                    final Set<BookieId> bookieIds = 
PulsarRegistrationClient.convertToBookieAddresses(children);
+                    final List<CompletableFuture<?>> bookieInfoUpdated = new 
ArrayList<>(bookieIds.size());
                     for (BookieId id : bookieIds) {
                         // update the cache for new bookies
-                        if (!bookieServiceInfoCache.containsKey(id)) {
-                            
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+                        if (path.equals(bookieReadonlyRegistrationPath) && 
readOnlyBookieInfo.get(id) == null) {
+                            
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+                            continue;
+                        }
+                        if (path.equals(bookieRegistrationPath) && 
writableBookieInfo.get(id) == null) {
+                            
bookieInfoUpdated.add(readBookieInfoAsWritableBookie(id));
+                            continue;
+                        }
+                        if (path.equals(bookieAllRegistrationPath)) {
+                            if (writableBookieInfo.get(id) != null || 
readOnlyBookieInfo.get(id) != null) {
+                                // jump to next bookie id
+                                continue;
+                            }
+                            // check writable first
+                            final CompletableFuture<?> 
revalidateAllBookiesFuture = readBookieInfoAsWritableBookie(id)
+                                    .thenCompose(writableBookieInfo -> 
writableBookieInfo
+                                                
.<CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>>>map(
+                                                        bookieServiceInfo -> 
completedFuture(null))
+                                                // check read-only then
+                                                .orElseGet(() -> 
readBookieInfoAsReadonlyBookie(id)));
+                            bookieInfoUpdated.add(revalidateAllBookiesFuture);
                         }
                     }
                     if (bookieInfoUpdated.isEmpty()) {
-                        return CompletableFuture.completedFuture(bookieIds);
+                        return completedFuture(bookieIds);
                     } else {
-                        return FutureUtil
-                                .waitForAll(bookieInfoUpdated)
+                        return waitForAll(bookieInfoUpdated)
                                 .thenApply(___ -> bookieIds);
                     }
                 })
@@ -153,42 +183,67 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
         readOnlyBookiesWatchers.remove(registrationListener);
     }
 
-    private void handleDeletedBookieNode(Notification n) {
-        if (n.getType() == NotificationType.Deleted) {
-            BookieId bookieId = stripBookieIdFromPath(n.getPath());
-            if (bookieId != null) {
-                log.info("Bookie {} disappeared", bookieId);
-                bookieServiceInfoCache.remove(bookieId);
-            }
+    /**
+     * This method will receive metadata store notifications and then update 
the
+     * local cache in background sequentially.
+     */
+    private void updatedBookies(Notification n) {
+        // make the notification callback run sequential in background.
+        final String path = n.getPath();
+        if (!path.startsWith(bookieReadonlyRegistrationPath) && 
!path.startsWith(bookieRegistrationPath)) {
+            // ignore unknown path
+            return;
         }
-    }
-
-    private void handleUpdatedBookieNode(Notification n) {
-        BookieId bookieId = stripBookieIdFromPath(n.getPath());
-        if (bookieId != null) {
-            log.info("Bookie {} info updated", bookieId);
-            readBookieServiceInfoAsync(bookieId);
+        if (path.equals(bookieReadonlyRegistrationPath) || 
path.equals(bookieRegistrationPath)) {
+            // ignore root path
+            return;
         }
-    }
-
-    private void updatedBookies(Notification n) {
-        if (n.getType() == NotificationType.Created || n.getType() == 
NotificationType.Deleted) {
-            if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
-                getReadOnlyBookies().thenAccept(bookies -> {
-                    readOnlyBookiesWatchers.forEach(w -> executor.execute(() 
-> w.onBookiesChanged(bookies)));
-                });
-                handleDeletedBookieNode(n);
-            } else if (n.getPath().startsWith(bookieRegistrationPath)) {
-                  getWritableBookies().thenAccept(bookies ->
-                        writableBookiesWatchers.forEach(w -> 
executor.execute(() -> w.onBookiesChanged(bookies))));
-                handleDeletedBookieNode(n);
-            }
-        } else if (n.getType() == NotificationType.Modified) {
-            if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
-                || n.getPath().startsWith(bookieRegistrationPath)) {
-                handleUpdatedBookieNode(n);
+        final BookieId bookieId = stripBookieIdFromPath(n.getPath());
+        sequencer.sequential(() -> {
+            switch (n.getType()) {
+                case Created:
+                    log.info("Bookie {} created. path: {}", bookieId, 
n.getPath());
+                    if (path.startsWith(bookieReadonlyRegistrationPath)) {
+                        return getReadOnlyBookies().thenAccept(bookies ->
+                                readOnlyBookiesWatchers.forEach(w ->
+                                        executor.execute(() -> 
w.onBookiesChanged(bookies))));
+                    }
+                    return getWritableBookies().thenAccept(bookies ->
+                            writableBookiesWatchers.forEach(w ->
+                                    executor.execute(() -> 
w.onBookiesChanged(bookies))));
+                case Modified:
+                    if (bookieId == null) {
+                        return completedFuture(null);
+                    }
+                    log.info("Bookie {} modified. path: {}", bookieId, 
n.getPath());
+                    if (path.startsWith(bookieReadonlyRegistrationPath)) {
+                        return 
readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null);
+                    }
+                    return 
readBookieInfoAsWritableBookie(bookieId).thenApply(__ -> null);
+                case Deleted:
+                    if (bookieId == null) {
+                        return completedFuture(null);
+                    }
+                    log.info("Bookie {} deleted. path: {}", bookieId, 
n.getPath());
+                    if (path.startsWith(bookieReadonlyRegistrationPath)) {
+                        readOnlyBookieInfo.remove(bookieId);
+                        return getReadOnlyBookies().thenAccept(bookies -> {
+                            readOnlyBookiesWatchers.forEach(w ->
+                                    executor.execute(() -> 
w.onBookiesChanged(bookies)));
+                        });
+                    }
+                    if (path.startsWith(bookieRegistrationPath)) {
+                        writableBookieInfo.remove(bookieId);
+                        return getWritableBookies().thenAccept(bookies -> {
+                            writableBookiesWatchers.forEach(w ->
+                                    executor.execute(() -> 
w.onBookiesChanged(bookies)));
+                        });
+                    }
+                    return completedFuture(null);
+                default:
+                    return completedFuture(null);
             }
-        }
+        });
     }
 
     private static BookieId stripBookieIdFromPath(String path) {
@@ -200,7 +255,7 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
             try {
                 return BookieId.parse(path.substring(slash + 1));
             } catch (IllegalArgumentException e) {
-                log.warn("Cannot decode bookieId from {}", path, e);
+                log.warn("Cannot decode bookieId from {}, error: {}", path, 
e.getMessage());
             }
         }
         return null;
@@ -227,46 +282,48 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
         // this is because there are a few cases in which some operations on 
the main thread
         // wait for the result. This is due to the fact that resolving the 
address of a bookie
         // is needed in many code paths.
-        Versioned<BookieServiceInfo> resultFromCache = 
bookieServiceInfoCache.get(bookieId);
+        Versioned<BookieServiceInfo> info;
+        if ((info = writableBookieInfo.get(bookieId)) == null) {
+            info = readOnlyBookieInfo.get(bookieId);
+        }
         if (log.isDebugEnabled()) {
-            log.debug("getBookieServiceInfo {} -> {}", bookieId, 
resultFromCache);
+            log.debug("getBookieServiceInfo {} -> {}", bookieId, info);
         }
-        if (resultFromCache != null) {
-            return CompletableFuture.completedFuture(resultFromCache);
+        if (info != null) {
+            return completedFuture(info);
         } else {
             return FutureUtils.exception(new 
BKException.BKBookieHandleNotAvailableException());
         }
     }
 
-    public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId 
bookieId) {
-        String asWritable = bookieRegistrationPath + "/" + bookieId;
-        return bookieServiceInfoMetadataCache.get(asWritable)
-                .thenCompose((Optional<BookieServiceInfo> getResult) -> {
-                    if (getResult.isPresent()) {
-                        Versioned<BookieServiceInfo> res =
-                                new Versioned<>(getResult.get(), new 
LongVersion(-1));
-                        log.info("Update BookieInfoCache (writable bookie) {} 
-> {}", bookieId, getResult.get());
-                        bookieServiceInfoCache.put(bookieId, res);
-                        return CompletableFuture.completedFuture(null);
-                    } else {
-                        return readBookieInfoAsReadonlyBookie(bookieId);
-                    }
-                }
-        );
+    public CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> 
readBookieInfoAsWritableBookie(
+            BookieId bookieId) {
+        final String asWritable = bookieRegistrationPath + "/" + bookieId;
+        return bookieServiceInfoMetadataCache.getWithStats(asWritable)
+                .thenApply((Optional<CacheGetResult<BookieServiceInfo>> 
bkInfoWithStats) -> {
+                            if (bkInfoWithStats.isPresent()) {
+                                final CacheGetResult<BookieServiceInfo> r = 
bkInfoWithStats.get();
+                                log.info("Update BookieInfoCache (writable 
bookie) {} -> {}", bookieId, r.getValue());
+                                writableBookieInfo.put(bookieId,
+                                        new Versioned<>(r.getValue(), new 
LongVersion(r.getStat().getVersion())));
+                            }
+                            return bkInfoWithStats;
+                        }
+                );
     }
 
-    final CompletableFuture<Void> readBookieInfoAsReadonlyBookie(BookieId 
bookieId) {
-        String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
-        return bookieServiceInfoMetadataCache.get(asReadonly)
-                .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) 
-> {
-                    if (getResultAsReadOnly.isPresent()) {
-                        Versioned<BookieServiceInfo> res =
-                                new Versioned<>(getResultAsReadOnly.get(), new 
LongVersion(-1));
-                        log.info("Update BookieInfoCache (readonly bookie) {} 
-> {}", bookieId,
-                                getResultAsReadOnly.get());
-                        bookieServiceInfoCache.put(bookieId, res);
+    final CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> 
readBookieInfoAsReadonlyBookie(
+            BookieId bookieId) {
+        final String asReadonly = bookieReadonlyRegistrationPath + "/" + 
bookieId;
+        return bookieServiceInfoMetadataCache.getWithStats(asReadonly)
+                .thenApply((Optional<CacheGetResult<BookieServiceInfo>> 
bkInfoWithStats) -> {
+                    if (bkInfoWithStats.isPresent()) {
+                        final CacheGetResult<BookieServiceInfo> r = 
bkInfoWithStats.get();
+                        log.info("Update BookieInfoCache (readonly bookie) {} 
-> {}", bookieId, r.getValue());
+                        readOnlyBookieInfo.put(bookieId,
+                                new Versioned<>(r.getValue(), new 
LongVersion(r.getStat().getVersion())));
                     }
-                    return null;
+                    return bkInfoWithStats;
                 });
     }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java
new file mode 100644
index 00000000000..bcbf41addba
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.metadata.bookkeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
+import org.apache.bookkeeper.bookie.BookieException.CookieExistException;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
+import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.BKInterruptedException;
+import org.apache.bookkeeper.client.BKException.MetaStoreException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationClient;
+import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
+import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.ZkLayoutManager;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Fault injectable ZK registration manager.
+ * Copy from #{@link org.apache.bookkeeper.discover.ZKRegistrationManager}.
+ */
+@Slf4j
+public class FaultInjectableZKRegistrationManager implements 
RegistrationManager {
+
+    private static final Function<Throwable, BKException> EXCEPTION_FUNC = 
cause -> {
+        if (cause instanceof BKException) {
+            log.error("Failed to get bookie list : ", cause);
+            return (BKException) cause;
+        } else if (cause instanceof InterruptedException) {
+            log.error("Interrupted reading bookie list : ", cause);
+            return new BKInterruptedException();
+        } else {
+            return new MetaStoreException();
+        }
+    };
+
+    private final ServerConfiguration conf;
+    private final ZooKeeper zk;
+    private final List<ACL> zkAcls;
+    private final LayoutManager layoutManager;
+
+    private volatile boolean zkRegManagerInitialized = false;
+
+    // ledgers root path
+    private final String ledgersRootPath;
+    // cookie path
+    private final String cookiePath;
+    // registration paths
+    protected final String bookieRegistrationPath;
+    protected final String bookieReadonlyRegistrationPath;
+    // session timeout in milliseconds
+    private final int zkTimeoutMs;
+    private final List<RegistrationListener> listeners = new ArrayList<>();
+    private Function<Void, Void> hookOnRegisterReadOnly;
+
+    public FaultInjectableZKRegistrationManager(ServerConfiguration conf,
+                                                ZooKeeper zk) {
+        this(conf, zk, ZKMetadataDriverBase.resolveZkLedgersRootPath(conf));
+    }
+
+    public FaultInjectableZKRegistrationManager(ServerConfiguration conf,
+                                                ZooKeeper zk,
+                                                String ledgersRootPath) {
+        this.conf = conf;
+        this.zk = zk;
+        this.zkAcls = ZkUtils.getACLs(conf);
+        this.ledgersRootPath = ledgersRootPath;
+        this.cookiePath = ledgersRootPath + "/" + COOKIE_NODE;
+        this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
+        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + 
"/" + READONLY;
+        this.zkTimeoutMs = conf.getZkTimeout();
+
+        this.layoutManager = new ZkLayoutManager(
+                zk,
+                ledgersRootPath,
+                zkAcls);
+
+        this.zk.register(event -> {
+            if (!zkRegManagerInitialized) {
+                // do nothing until first registration
+                return;
+            }
+            // Check for expired connection.
+            if (event.getType().equals(EventType.None)
+                    && event.getState().equals(KeeperState.Expired)) {
+                listeners.forEach(RegistrationListener::onRegistrationExpired);
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+
+    /**
+     * Returns the CookiePath of the bookie in the ZooKeeper.
+     *
+     * @param bookieId bookie id
+     * @return
+     */
+    public String getCookiePath(BookieId bookieId) {
+        return this.cookiePath + "/" + bookieId;
+    }
+
+    //
+    // Registration Management
+    //
+
+    /**
+     * Check existence of <i>regPath</i> and wait it expired if possible.
+     *
+     * @param regPath reg node path.
+     * @return true if regPath exists, otherwise return false
+     * @throws IOException if can't create reg path
+     */
+    protected boolean checkRegNodeAndWaitExpired(String regPath) throws 
IOException {
+        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+        Watcher zkPrevRegNodewatcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                // Check for prev znode deletion. Connection expiration is
+                // not handling, since bookie has logic to shutdown.
+                if (EventType.NodeDeleted == event.getType()) {
+                    prevNodeLatch.countDown();
+                }
+            }
+        };
+        try {
+            Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
+            if (null != stat) {
+                // if the ephemeral owner isn't current zookeeper client
+                // wait for it to be expired.
+                if (stat.getEphemeralOwner() != zk.getSessionId()) {
+                    log.info("Previous bookie registration znode: {} exists, 
so waiting zk sessiontimeout:"
+                            + " {} ms for znode deletion", regPath, 
zkTimeoutMs);
+                    // waiting for the previous bookie reg znode deletion
+                    if (!prevNodeLatch.await(zkTimeoutMs, 
TimeUnit.MILLISECONDS)) {
+                        throw new NodeExistsException(regPath);
+                    } else {
+                        return false;
+                    }
+                }
+                return true;
+            } else {
+                return false;
+            }
+        } catch (KeeperException ke) {
+            log.error("ZK exception checking and wait ephemeral znode {} 
expired : ", regPath, ke);
+            throw new IOException("ZK exception checking and wait ephemeral 
znode "
+                    + regPath + " expired", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            log.error("Interrupted checking and wait ephemeral znode {} 
expired : ", regPath, ie);
+            throw new IOException("Interrupted checking and wait ephemeral 
znode "
+                    + regPath + " expired", ie);
+        }
+    }
+
+    @Override
+    public void registerBookie(BookieId bookieId, boolean readOnly,
+                               BookieServiceInfo bookieServiceInfo) throws 
BookieException {
+        if (!readOnly) {
+            String regPath = bookieRegistrationPath + "/" + bookieId;
+            doRegisterBookie(regPath, bookieServiceInfo);
+        } else {
+            doRegisterReadOnlyBookie(bookieId, bookieServiceInfo);
+        }
+    }
+
+    @VisibleForTesting
+    static byte[] serializeBookieServiceInfo(BookieServiceInfo 
bookieServiceInfo) {
+        if (log.isDebugEnabled()) {
+            log.debug("serialize BookieServiceInfo {}", bookieServiceInfo);
+        }
+        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+            BookieServiceInfoFormat.Builder builder = 
BookieServiceInfoFormat.newBuilder();
+            List<BookieServiceInfoFormat.Endpoint> bsiEndpoints = 
bookieServiceInfo.getEndpoints().stream()
+                    .map(e -> {
+                        return BookieServiceInfoFormat.Endpoint.newBuilder()
+                                .setId(e.getId())
+                                .setPort(e.getPort())
+                                .setHost(e.getHost())
+                                .setProtocol(e.getProtocol())
+                                .addAllAuth(e.getAuth())
+                                .addAllExtensions(e.getExtensions())
+                                .build();
+                    })
+                    .collect(Collectors.toList());
+
+            builder.addAllEndpoints(bsiEndpoints);
+            builder.putAllProperties(bookieServiceInfo.getProperties());
+
+            builder.build().writeTo(os);
+            return os.toByteArray();
+        } catch (IOException err) {
+            log.error("Cannot serialize bookieServiceInfo from " + 
bookieServiceInfo);
+            throw new RuntimeException(err);
+        }
+    }
+
+    private void doRegisterBookie(String regPath, BookieServiceInfo 
bookieServiceInfo) throws BookieException {
+        // ZK ephemeral node for this Bookie.
+        try {
+            if (!checkRegNodeAndWaitExpired(regPath)) {
+                // Create the ZK ephemeral node for this Bookie.
+                zk.create(regPath, 
serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL);
+                zkRegManagerInitialized = true;
+            }
+        } catch (KeeperException ke) {
+            log.error("ZK exception registering ephemeral Znode for Bookie!", 
ke);
+            // Throw an IOException back up. This will cause the Bookie
+            // constructor to error out. Alternatively, we could do a System
+            // exit here as this is a fatal error.
+            throw new MetadataStoreException(ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            log.error("Interrupted exception registering ephemeral Znode for 
Bookie!", ie);
+            // Throw an IOException back up. This will cause the Bookie
+            // constructor to error out. Alternatively, we could do a System
+            // exit here as this is a fatal error.
+            throw new MetadataStoreException(ie);
+        } catch (IOException e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    private void doRegisterReadOnlyBookie(BookieId bookieId, BookieServiceInfo 
bookieServiceInfo)
+            throws BookieException {
+        try {
+            if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) 
{
+                try {
+                    zk.create(this.bookieReadonlyRegistrationPath, 
serializeBookieServiceInfo(bookieServiceInfo),
+                            zkAcls, CreateMode.PERSISTENT);
+                } catch (NodeExistsException e) {
+                    // this node is just now created by someone.
+                }
+            }
+            String regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+            doRegisterBookie(regPath, bookieServiceInfo);
+            // clear the write state
+            regPath = bookieRegistrationPath + "/" + bookieId;
+            try {
+                if (hookOnRegisterReadOnly != null) {
+                    hookOnRegisterReadOnly.apply(null);
+                }
+                // Clear the current registered node
+                zk.delete(regPath, -1);
+            } catch (KeeperException.NoNodeException nne) {
+                log.warn("No writable bookie registered node {} when 
transitioning to readonly",
+                        regPath, nne);
+            }
+        } catch (KeeperException | InterruptedException e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    @Override
+    public void unregisterBookie(BookieId bookieId, boolean readOnly) throws 
BookieException {
+        String regPath;
+        if (!readOnly) {
+            regPath = bookieRegistrationPath + "/" + bookieId;
+        } else {
+            regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+        }
+        doUnregisterBookie(regPath);
+    }
+
+    private void doUnregisterBookie(String regPath) throws BookieException {
+        try {
+            zk.delete(regPath, -1);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new MetadataStoreException(ie);
+        } catch (KeeperException e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    //
+    // Cookie Management
+    //
+
+    @Override
+    public void writeCookie(BookieId bookieId,
+                            Versioned<byte[]> cookieData) throws 
BookieException {
+        String zkPath = getCookiePath(bookieId);
+        try {
+            if (Version.NEW == cookieData.getVersion()) {
+                if (zk.exists(cookiePath, false) == null) {
+                    try {
+                        zk.create(cookiePath, new byte[0], zkAcls, 
CreateMode.PERSISTENT);
+                    } catch (NodeExistsException nne) {
+                        log.info("More than one bookie tried to create {} at 
once. Safe to ignore.",
+                                cookiePath);
+                    }
+                }
+                zk.create(zkPath, cookieData.getValue(), zkAcls, 
CreateMode.PERSISTENT);
+            } else {
+                if (!(cookieData.getVersion() instanceof LongVersion)) {
+                    throw new BookieIllegalOpException("Invalid version type, 
expected it to be LongVersion");
+                }
+                zk.setData(
+                        zkPath,
+                        cookieData.getValue(),
+                        (int) ((LongVersion) 
cookieData.getVersion()).getLongVersion());
+            }
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new MetadataStoreException("Interrupted writing cookie for 
bookie " + bookieId, ie);
+        } catch (NoNodeException nne) {
+            throw new CookieNotFoundException(bookieId.toString());
+        } catch (NodeExistsException nee) {
+            throw new CookieExistException(bookieId.toString());
+        } catch (KeeperException e) {
+            throw new MetadataStoreException("Failed to write cookie for 
bookie " + bookieId);
+        }
+    }
+
+    @Override
+    public Versioned<byte[]> readCookie(BookieId bookieId) throws 
BookieException {
+        String zkPath = getCookiePath(bookieId);
+        try {
+            Stat stat = zk.exists(zkPath, false);
+            byte[] data = zk.getData(zkPath, false, stat);
+            // sets stat version from ZooKeeper
+            LongVersion version = new LongVersion(stat.getVersion());
+            return new Versioned<>(data, version);
+        } catch (NoNodeException nne) {
+            throw new CookieNotFoundException(bookieId.toString());
+        } catch (KeeperException | InterruptedException e) {
+            throw new MetadataStoreException("Failed to read cookie for bookie 
" + bookieId);
+        }
+    }
+
+    @Override
+    public void removeCookie(BookieId bookieId, Version version) throws 
BookieException {
+        String zkPath = getCookiePath(bookieId);
+        try {
+            zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
+        } catch (NoNodeException e) {
+            throw new CookieNotFoundException(bookieId.toString());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new MetadataStoreException("Interrupted deleting cookie for 
bookie " + bookieId, e);
+        } catch (KeeperException e) {
+            throw new MetadataStoreException("Failed to delete cookie for 
bookie " + bookieId);
+        }
+
+        log.info("Removed cookie from {} for bookie {}.", cookiePath, 
bookieId);
+    }
+
+
+    @Override
+    public String getClusterInstanceId() throws BookieException {
+        String instanceId = null;
+        try {
+            if (zk.exists(ledgersRootPath, null) == null) {
+                log.error("BookKeeper metadata doesn't exist in zookeeper. "
+                        + "Has the cluster been initialized? "
+                        + "Try running bin/bookkeeper shell metaformat");
+                throw new KeeperException.NoNodeException("BookKeeper 
metadata");
+            }
+            try {
+                byte[] data = zk.getData(ledgersRootPath + "/"
+                        + INSTANCEID, false, null);
+                instanceId = new String(data, UTF_8);
+            } catch (KeeperException.NoNodeException e) {
+                log.info("INSTANCEID not exists in zookeeper. Not considering 
it for data verification");
+            }
+        } catch (KeeperException | InterruptedException e) {
+            throw new MetadataStoreException("Failed to get cluster instance 
id", e);
+        }
+        return instanceId;
+    }
+
+    @Override
+    public boolean prepareFormat() throws Exception {
+        boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
+        boolean availableNodeExists = null != 
zk.exists(bookieRegistrationPath, false);
+        // Create ledgers root node if not exists
+        if (!ledgerRootExists) {
+            ZkUtils.createFullPathOptimistic(zk, ledgersRootPath, 
"".getBytes(StandardCharsets.UTF_8), zkAcls,
+                    CreateMode.PERSISTENT);
+        }
+        // create available bookies node if not exists
+        if (!availableNodeExists) {
+            zk.create(bookieRegistrationPath, 
"".getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+        }
+
+        // create readonly bookies node if not exists
+        if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
+            zk.create(bookieReadonlyRegistrationPath, new byte[0], zkAcls, 
CreateMode.PERSISTENT);
+        }
+
+        return ledgerRootExists;
+    }
+
+    @Override
+    public boolean initNewCluster() throws Exception {
+        String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
+        String instanceIdPath = ledgersRootPath + "/" + INSTANCEID;
+        log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: 
{} ledger root path: {}", zkServers,
+                ledgersRootPath);
+
+        boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
+
+        if (ledgerRootExists) {
+            log.error("Ledger root path: {} already exists", ledgersRootPath);
+            return false;
+        }
+
+        List<Op> multiOps = Lists.newArrayListWithExpectedSize(4);
+
+        // Create ledgers root node
+        multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, 
CreateMode.PERSISTENT));
+
+        // create available bookies node
+        multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, 
zkAcls, CreateMode.PERSISTENT));
+
+        // create readonly bookies node
+        multiOps.add(Op.create(
+                bookieReadonlyRegistrationPath,
+                EMPTY_BYTE_ARRAY,
+                zkAcls,
+                CreateMode.PERSISTENT));
+
+        // create INSTANCEID
+        String instanceId = UUID.randomUUID().toString();
+        multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8),
+                zkAcls, CreateMode.PERSISTENT));
+
+        // execute the multi ops
+        zk.multi(multiOps);
+
+        // creates the new layout and stores in zookeeper
+        AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, 
layoutManager);
+
+        log.info("Successfully initiated cluster. ZKServers: {} ledger root 
path: {} instanceId: {}", zkServers,
+                ledgersRootPath, instanceId);
+        return true;
+    }
+
+    @Override
+    public boolean nukeExistingCluster() throws Exception {
+        String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
+        log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} 
ledger root path: {}",
+                zkServers, ledgersRootPath);
+
+        boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
+        if (!ledgerRootExists) {
+            log.info("There is no existing cluster with ledgersRootPath: {} in 
ZKServers: {}, "
+                    + "so exiting nuke operation", ledgersRootPath, zkServers);
+            return true;
+        }
+
+        boolean availableNodeExists = null != 
zk.exists(bookieRegistrationPath, false);
+        try (RegistrationClient regClient = new ZKRegistrationClient(
+                zk,
+                ledgersRootPath,
+                null,
+                false
+        )) {
+            if (availableNodeExists) {
+                Collection<BookieId> rwBookies = FutureUtils
+                        .result(regClient.getWritableBookies(), 
EXCEPTION_FUNC).getValue();
+                if (rwBookies != null && !rwBookies.isEmpty()) {
+                    log.error("Bookies are still up and connected to this 
cluster, "
+                            + "stop all bookies before nuking the cluster");
+                    return false;
+                }
+
+                boolean readonlyNodeExists = null != 
zk.exists(bookieReadonlyRegistrationPath, false);
+                if (readonlyNodeExists) {
+                    Collection<BookieId> roBookies = FutureUtils
+                            .result(regClient.getReadOnlyBookies(), 
EXCEPTION_FUNC).getValue();
+                    if (roBookies != null && !roBookies.isEmpty()) {
+                        log.error("Readonly Bookies are still up and connected 
to this cluster, "
+                                + "stop all bookies before nuking the 
cluster");
+                        return false;
+                    }
+                }
+            }
+        }
+
+        LedgerManagerFactory ledgerManagerFactory =
+                AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, 
layoutManager);
+        return ledgerManagerFactory.validateAndNukeExistingCluster(conf, 
layoutManager);
+    }
+
+    @Override
+    public boolean format() throws Exception {
+        // Clear underreplicated ledgers
+        try {
+            ZKUtil.deleteRecursive(zk, 
ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath)
+                    + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("underreplicated ledgers root path node not exists 
in zookeeper to delete");
+            }
+        }
+
+        // Clear underreplicatedledger locks
+        try {
+            ZKUtil.deleteRecursive(zk, 
ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + '/'
+                    + BookKeeperConstants.UNDER_REPLICATION_LOCK);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("underreplicatedledger locks node not exists in 
zookeeper to delete");
+            }
+        }
+
+        // Clear the cookies
+        try {
+            ZKUtil.deleteRecursive(zk, cookiePath);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("cookies node not exists in zookeeper to delete");
+            }
+        }
+
+        // Clear the INSTANCEID
+        try {
+            zk.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, 
-1);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("INSTANCEID not exists in zookeeper to delete");
+            }
+        }
+
+        // create INSTANCEID
+        String instanceId = UUID.randomUUID().toString();
+        zk.create(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID,
+                instanceId.getBytes(StandardCharsets.UTF_8), zkAcls, 
CreateMode.PERSISTENT);
+
+        log.info("Successfully formatted BookKeeper metadata");
+        return true;
+    }
+
+    @Override
+    public boolean isBookieRegistered(BookieId bookieId) throws 
BookieException {
+        String regPath = bookieRegistrationPath + "/" + bookieId;
+        String readonlyRegPath = bookieReadonlyRegistrationPath + "/" + 
bookieId;
+        try {
+            return ((null != zk.exists(regPath, false)) || (null != 
zk.exists(readonlyRegPath, false)));
+        } catch (KeeperException e) {
+            log.error("ZK exception while checking registration ephemeral 
znodes for BookieId: {}", bookieId, e);
+            throw new MetadataStoreException(e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("InterruptedException while checking registration 
ephemeral znodes for BookieId: {}", bookieId,
+                    e);
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    @Override
+    public void addRegistrationListener(RegistrationListener listener) {
+        listeners.add(listener);
+    }
+
+    public void betweenRegisterReadOnlyBookie(Function<Void, Void> fn) {
+        hookOnRegisterReadOnly = fn;
+    }
+}
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 409a6724381..c4f56915119 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -43,9 +43,8 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
-import org.apache.bookkeeper.discover.RegistrationClient;
-import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.*;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.versioning.Version;
@@ -53,6 +52,7 @@ import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.pulsar.metadata.BaseMetadataStoreTest;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
@@ -169,10 +169,10 @@ public class PulsarRegistrationClientTest extends 
BaseMetadataStoreTest {
         getAndVerifyAllBookies(rc, addresses);
 
         Awaitility.await().untilAsserted(() -> {
-        for (BookieId address : addresses) {
-            BookieServiceInfo bookieServiceInfo = 
rc.getBookieServiceInfo(address).get().getValue();
-            compareBookieServiceInfo(bookieServiceInfo, 
bookieServiceInfos.get(address));
-        }});
+            for (BookieId address : addresses) {
+                BookieServiceInfo bookieServiceInfo = 
rc.getBookieServiceInfo(address).get().getValue();
+                compareBookieServiceInfo(bookieServiceInfo, 
bookieServiceInfos.get(address));
+            }});
 
         // shutdown the bookies (but keep the cookie)
         for (BookieId address : addresses) {
@@ -185,12 +185,12 @@ public class PulsarRegistrationClientTest extends 
BaseMetadataStoreTest {
 
         // getBookieServiceInfo should fail with 
BKBookieHandleNotAvailableException
         Awaitility.await().untilAsserted(() -> {
-        for (BookieId address : addresses) {
-            assertTrue(
-                expectThrows(ExecutionException.class, () -> {
-                    rc.getBookieServiceInfo(address).get();
-            }).getCause() instanceof 
BKException.BKBookieHandleNotAvailableException);
-        }});
+            for (BookieId address : addresses) {
+                assertTrue(
+                        expectThrows(ExecutionException.class, () -> {
+                            rc.getBookieServiceInfo(address).get();
+                        }).getCause() instanceof 
BKException.BKBookieHandleNotAvailableException);
+            }});
 
 
         // restart the bookies, all writable
@@ -242,12 +242,12 @@ public class PulsarRegistrationClientTest extends 
BaseMetadataStoreTest {
                 .await()
                 .ignoreExceptionsMatching(e -> e.getCause() instanceof 
BKException.BKBookieHandleNotAvailableException)
                 .untilAsserted(() -> {
-            // verify that infos are updated
-            for (BookieId address : addresses) {
-                BookieServiceInfo bookieServiceInfo = 
rc.getBookieServiceInfo(address).get().getValue();
-                compareBookieServiceInfo(bookieServiceInfo, 
bookieServiceInfos.get(address));
-            }
-        });
+                    // verify that infos are updated
+                    for (BookieId address : addresses) {
+                        BookieServiceInfo bookieServiceInfo = 
rc.getBookieServiceInfo(address).get().getValue();
+                        compareBookieServiceInfo(bookieServiceInfo, 
bookieServiceInfos.get(address));
+                    }
+                });
 
     }
 
@@ -358,4 +358,88 @@ public class PulsarRegistrationClientTest extends 
BaseMetadataStoreTest {
         });
     }
 
+
+    @Test
+    public void testNetworkDelayWithBkZkManager() throws Throwable {
+        final String zksConnectionString = zks.getConnectionString();
+        final String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
+        // prepare registration manager
+        ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null);
+        final ServerConfiguration serverConfiguration = new 
ServerConfiguration();
+        serverConfiguration.setZkLedgersRootPath(ledgersRoot);
+        final FaultInjectableZKRegistrationManager rm = new 
FaultInjectableZKRegistrationManager(serverConfiguration, zk);
+        rm.prepareFormat();
+        // prepare registration client
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(zksConnectionString,
+                MetadataStoreConfig.builder().build());
+        @Cleanup
+        RegistrationClient rc1 = new PulsarRegistrationClient(store, 
ledgersRoot);
+        @Cleanup
+        RegistrationClient rc2 = new PulsarRegistrationClient(store, 
ledgersRoot);
+
+        final List<BookieId> addresses = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            addresses.add(BookieId.parse("BOOKIE-" + i));
+        }
+        final Map<BookieId, BookieServiceInfo> bookieServiceInfos = new 
HashMap<>();
+
+        int port = 223;
+        for (BookieId address : addresses) {
+            BookieServiceInfo info = new BookieServiceInfo();
+            BookieServiceInfo.Endpoint endpoint = new 
BookieServiceInfo.Endpoint();
+            endpoint.setAuth(Collections.emptyList());
+            endpoint.setExtensions(Collections.emptyList());
+            endpoint.setId("id");
+            endpoint.setHost("localhost");
+            endpoint.setPort(port++);
+            endpoint.setProtocol("bookie-rpc");
+            info.setEndpoints(Arrays.asList(endpoint));
+            bookieServiceInfos.put(address, info);
+            rm.registerBookie(address, false, info);
+            // write the cookie
+            rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW));
+        }
+
+        // trigger loading the BookieServiceInfo in the local cache
+        getAndVerifyAllBookies(rc1, addresses);
+        getAndVerifyAllBookies(rc2, addresses);
+
+        Awaitility.await().untilAsserted(() -> {
+            for (BookieId address : addresses) {
+                
compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(),
+                        bookieServiceInfos.get(address));
+                
compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(),
+                        bookieServiceInfos.get(address));
+            }
+        });
+
+        // verified the init status.
+
+
+        // mock network delay
+        rm.betweenRegisterReadOnlyBookie(__ -> {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return null;
+        });
+
+        for (int i = 0; i < addresses.size() / 2; i++) {
+            final BookieId bkId = addresses.get(i);
+            // turn some bookies to be read only.
+            rm.registerBookie(bkId, true, bookieServiceInfos.get(bkId));
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            for (BookieId address : addresses) {
+                
compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(),
+                        bookieServiceInfos.get(address));
+                
compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(),
+                        bookieServiceInfos.get(address));
+            }
+        });
+    }
 }

Reply via email to