This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new a9860694c36 [fix][meta] Bookie Info lost by notification race
condition. (#20642)
a9860694c36 is described below
commit a9860694c364c2b2d8b1daadee621e4ff87a082b
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Jun 30 09:21:15 2023 +0800
[fix][meta] Bookie Info lost by notification race condition. (#20642)
---
.../bookkeeper/PulsarRegistrationClient.java | 221 +++++---
.../FaultInjectableZKRegistrationManager.java | 630 +++++++++++++++++++++
.../bookkeeper/PulsarRegistrationClientTest.java | 122 +++-
3 files changed, 872 insertions(+), 101 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index c5178cd3e24..058d8dfe436 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -18,13 +18,18 @@
*/
package org.apache.pulsar.metadata.bookkeeper;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import static org.apache.pulsar.common.util.FutureUtil.Sequencer;
+import static org.apache.pulsar.common.util.FutureUtil.waitForAll;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -42,10 +47,10 @@ import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
-import org.apache.pulsar.metadata.api.NotificationType;
@Slf4j
public class PulsarRegistrationClient implements RegistrationClient {
@@ -56,20 +61,22 @@ public class PulsarRegistrationClient implements
RegistrationClient {
private final String bookieRegistrationPath;
private final String bookieAllRegistrationPath;
private final String bookieReadonlyRegistrationPath;
-
- private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>>
bookieServiceInfoCache =
-
new ConcurrentHashMap();
private final Set<RegistrationListener> writableBookiesWatchers = new
CopyOnWriteArraySet<>();
private final Set<RegistrationListener> readOnlyBookiesWatchers = new
CopyOnWriteArraySet<>();
private final MetadataCache<BookieServiceInfo>
bookieServiceInfoMetadataCache;
private final ScheduledExecutorService executor;
+ private final Map<BookieId, Versioned<BookieServiceInfo>>
writableBookieInfo;
+ private final Map<BookieId, Versioned<BookieServiceInfo>>
readOnlyBookieInfo;
+ private final FutureUtil.Sequencer<Void> sequencer;
public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.ledgersRootPath = ledgersRootPath;
this.bookieServiceInfoMetadataCache =
store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
-
+ this.sequencer = Sequencer.create();
+ this.writableBookieInfo = new ConcurrentHashMap<>();
+ this.readOnlyBookieInfo = new ConcurrentHashMap<>();
// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
// we can disable this feature, in case the BK cluster has only
@@ -77,7 +84,6 @@ public class PulsarRegistrationClient implements
RegistrationClient {
this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath +
"/" + READONLY;
-
this.executor = Executors
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-registration-client"));
@@ -91,38 +97,62 @@ public class PulsarRegistrationClient implements
RegistrationClient {
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
- return getChildren(bookieRegistrationPath);
+ return getBookiesThenFreshCache(bookieRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
// this method is meant to return all the known bookies, even the
bookies
// that are not in a running state
- return getChildren(bookieAllRegistrationPath);
+ return getBookiesThenFreshCache(bookieAllRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
- return getChildren(bookieReadonlyRegistrationPath);
+ return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
}
- private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String
path) {
+ /**
+ * @throws IllegalArgumentException if parameter path is null or empty.
+ */
+ private CompletableFuture<Versioned<Set<BookieId>>>
getBookiesThenFreshCache(String path) {
+ if (path == null || path.isEmpty()) {
+ return failedFuture(
+ new IllegalArgumentException("parameter [path] can not be
null or empty."));
+ }
return store.getChildren(path)
.thenComposeAsync(children -> {
- Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
- List<CompletableFuture<?>> bookieInfoUpdated =
- new ArrayList<>(bookieIds.size());
+ final Set<BookieId> bookieIds =
PulsarRegistrationClient.convertToBookieAddresses(children);
+ final List<CompletableFuture<?>> bookieInfoUpdated = new
ArrayList<>(bookieIds.size());
for (BookieId id : bookieIds) {
// update the cache for new bookies
- if (!bookieServiceInfoCache.containsKey(id)) {
-
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+ if (path.equals(bookieReadonlyRegistrationPath) &&
readOnlyBookieInfo.get(id) == null) {
+
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
+ continue;
+ }
+ if (path.equals(bookieRegistrationPath) &&
writableBookieInfo.get(id) == null) {
+
bookieInfoUpdated.add(readBookieInfoAsWritableBookie(id));
+ continue;
+ }
+ if (path.equals(bookieAllRegistrationPath)) {
+ if (writableBookieInfo.get(id) != null ||
readOnlyBookieInfo.get(id) != null) {
+ // jump to next bookie id
+ continue;
+ }
+ // check writable first
+ final CompletableFuture<?>
revalidateAllBookiesFuture = readBookieInfoAsWritableBookie(id)
+ .thenCompose(writableBookieInfo ->
writableBookieInfo
+
.<CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>>>map(
+ bookieServiceInfo ->
completedFuture(null))
+ // check read-only then
+ .orElseGet(() ->
readBookieInfoAsReadonlyBookie(id)));
+ bookieInfoUpdated.add(revalidateAllBookiesFuture);
}
}
if (bookieInfoUpdated.isEmpty()) {
- return CompletableFuture.completedFuture(bookieIds);
+ return completedFuture(bookieIds);
} else {
- return FutureUtil
- .waitForAll(bookieInfoUpdated)
+ return waitForAll(bookieInfoUpdated)
.thenApply(___ -> bookieIds);
}
})
@@ -153,42 +183,67 @@ public class PulsarRegistrationClient implements
RegistrationClient {
readOnlyBookiesWatchers.remove(registrationListener);
}
- private void handleDeletedBookieNode(Notification n) {
- if (n.getType() == NotificationType.Deleted) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
- if (bookieId != null) {
- log.info("Bookie {} disappeared", bookieId);
- bookieServiceInfoCache.remove(bookieId);
- }
+ /**
+ * This method will receive metadata store notifications and then update
the
+ * local cache in background sequentially.
+ */
+ private void updatedBookies(Notification n) {
+ // make the notification callback run sequential in background.
+ final String path = n.getPath();
+ if (!path.startsWith(bookieReadonlyRegistrationPath) &&
!path.startsWith(bookieRegistrationPath)) {
+ // ignore unknown path
+ return;
}
- }
-
- private void handleUpdatedBookieNode(Notification n) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
- if (bookieId != null) {
- log.info("Bookie {} info updated", bookieId);
- readBookieServiceInfoAsync(bookieId);
+ if (path.equals(bookieReadonlyRegistrationPath) ||
path.equals(bookieRegistrationPath)) {
+ // ignore root path
+ return;
}
- }
-
- private void updatedBookies(Notification n) {
- if (n.getType() == NotificationType.Created || n.getType() ==
NotificationType.Deleted) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
- getReadOnlyBookies().thenAccept(bookies -> {
- readOnlyBookiesWatchers.forEach(w -> executor.execute(()
-> w.onBookiesChanged(bookies)));
- });
- handleDeletedBookieNode(n);
- } else if (n.getPath().startsWith(bookieRegistrationPath)) {
- getWritableBookies().thenAccept(bookies ->
- writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
- handleDeletedBookieNode(n);
- }
- } else if (n.getType() == NotificationType.Modified) {
- if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
- || n.getPath().startsWith(bookieRegistrationPath)) {
- handleUpdatedBookieNode(n);
+ final BookieId bookieId = stripBookieIdFromPath(n.getPath());
+ sequencer.sequential(() -> {
+ switch (n.getType()) {
+ case Created:
+ log.info("Bookie {} created. path: {}", bookieId,
n.getPath());
+ if (path.startsWith(bookieReadonlyRegistrationPath)) {
+ return getReadOnlyBookies().thenAccept(bookies ->
+ readOnlyBookiesWatchers.forEach(w ->
+ executor.execute(() ->
w.onBookiesChanged(bookies))));
+ }
+ return getWritableBookies().thenAccept(bookies ->
+ writableBookiesWatchers.forEach(w ->
+ executor.execute(() ->
w.onBookiesChanged(bookies))));
+ case Modified:
+ if (bookieId == null) {
+ return completedFuture(null);
+ }
+ log.info("Bookie {} modified. path: {}", bookieId,
n.getPath());
+ if (path.startsWith(bookieReadonlyRegistrationPath)) {
+ return
readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null);
+ }
+ return
readBookieInfoAsWritableBookie(bookieId).thenApply(__ -> null);
+ case Deleted:
+ if (bookieId == null) {
+ return completedFuture(null);
+ }
+ log.info("Bookie {} deleted. path: {}", bookieId,
n.getPath());
+ if (path.startsWith(bookieReadonlyRegistrationPath)) {
+ readOnlyBookieInfo.remove(bookieId);
+ return getReadOnlyBookies().thenAccept(bookies -> {
+ readOnlyBookiesWatchers.forEach(w ->
+ executor.execute(() ->
w.onBookiesChanged(bookies)));
+ });
+ }
+ if (path.startsWith(bookieRegistrationPath)) {
+ writableBookieInfo.remove(bookieId);
+ return getWritableBookies().thenAccept(bookies -> {
+ writableBookiesWatchers.forEach(w ->
+ executor.execute(() ->
w.onBookiesChanged(bookies)));
+ });
+ }
+ return completedFuture(null);
+ default:
+ return completedFuture(null);
}
- }
+ });
}
private static BookieId stripBookieIdFromPath(String path) {
@@ -200,7 +255,7 @@ public class PulsarRegistrationClient implements
RegistrationClient {
try {
return BookieId.parse(path.substring(slash + 1));
} catch (IllegalArgumentException e) {
- log.warn("Cannot decode bookieId from {}", path, e);
+ log.warn("Cannot decode bookieId from {}, error: {}", path,
e.getMessage());
}
}
return null;
@@ -227,46 +282,48 @@ public class PulsarRegistrationClient implements
RegistrationClient {
// this is because there are a few cases in which some operations on
the main thread
// wait for the result. This is due to the fact that resolving the
address of a bookie
// is needed in many code paths.
- Versioned<BookieServiceInfo> resultFromCache =
bookieServiceInfoCache.get(bookieId);
+ Versioned<BookieServiceInfo> info;
+ if ((info = writableBookieInfo.get(bookieId)) == null) {
+ info = readOnlyBookieInfo.get(bookieId);
+ }
if (log.isDebugEnabled()) {
- log.debug("getBookieServiceInfo {} -> {}", bookieId,
resultFromCache);
+ log.debug("getBookieServiceInfo {} -> {}", bookieId, info);
}
- if (resultFromCache != null) {
- return CompletableFuture.completedFuture(resultFromCache);
+ if (info != null) {
+ return completedFuture(info);
} else {
return FutureUtils.exception(new
BKException.BKBookieHandleNotAvailableException());
}
}
- public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId
bookieId) {
- String asWritable = bookieRegistrationPath + "/" + bookieId;
- return bookieServiceInfoMetadataCache.get(asWritable)
- .thenCompose((Optional<BookieServiceInfo> getResult) -> {
- if (getResult.isPresent()) {
- Versioned<BookieServiceInfo> res =
- new Versioned<>(getResult.get(), new
LongVersion(-1));
- log.info("Update BookieInfoCache (writable bookie) {}
-> {}", bookieId, getResult.get());
- bookieServiceInfoCache.put(bookieId, res);
- return CompletableFuture.completedFuture(null);
- } else {
- return readBookieInfoAsReadonlyBookie(bookieId);
- }
- }
- );
+ public CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>>
readBookieInfoAsWritableBookie(
+ BookieId bookieId) {
+ final String asWritable = bookieRegistrationPath + "/" + bookieId;
+ return bookieServiceInfoMetadataCache.getWithStats(asWritable)
+ .thenApply((Optional<CacheGetResult<BookieServiceInfo>>
bkInfoWithStats) -> {
+ if (bkInfoWithStats.isPresent()) {
+ final CacheGetResult<BookieServiceInfo> r =
bkInfoWithStats.get();
+ log.info("Update BookieInfoCache (writable
bookie) {} -> {}", bookieId, r.getValue());
+ writableBookieInfo.put(bookieId,
+ new Versioned<>(r.getValue(), new
LongVersion(r.getStat().getVersion())));
+ }
+ return bkInfoWithStats;
+ }
+ );
}
- final CompletableFuture<Void> readBookieInfoAsReadonlyBookie(BookieId
bookieId) {
- String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
- return bookieServiceInfoMetadataCache.get(asReadonly)
- .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly)
-> {
- if (getResultAsReadOnly.isPresent()) {
- Versioned<BookieServiceInfo> res =
- new Versioned<>(getResultAsReadOnly.get(), new
LongVersion(-1));
- log.info("Update BookieInfoCache (readonly bookie) {}
-> {}", bookieId,
- getResultAsReadOnly.get());
- bookieServiceInfoCache.put(bookieId, res);
+ final CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>>
readBookieInfoAsReadonlyBookie(
+ BookieId bookieId) {
+ final String asReadonly = bookieReadonlyRegistrationPath + "/" +
bookieId;
+ return bookieServiceInfoMetadataCache.getWithStats(asReadonly)
+ .thenApply((Optional<CacheGetResult<BookieServiceInfo>>
bkInfoWithStats) -> {
+ if (bkInfoWithStats.isPresent()) {
+ final CacheGetResult<BookieServiceInfo> r =
bkInfoWithStats.get();
+ log.info("Update BookieInfoCache (readonly bookie) {}
-> {}", bookieId, r.getValue());
+ readOnlyBookieInfo.put(bookieId,
+ new Versioned<>(r.getValue(), new
LongVersion(r.getStat().getVersion())));
}
- return null;
+ return bkInfoWithStats;
});
}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java
new file mode 100644
index 00000000000..bcbf41addba
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.metadata.bookkeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
+import org.apache.bookkeeper.bookie.BookieException.CookieExistException;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
+import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.BKInterruptedException;
+import org.apache.bookkeeper.client.BKException.MetaStoreException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationClient;
+import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
+import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.ZkLayoutManager;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Fault injectable ZK registration manager.
+ * Copy from #{@link org.apache.bookkeeper.discover.ZKRegistrationManager}.
+ */
+@Slf4j
+public class FaultInjectableZKRegistrationManager implements
RegistrationManager {
+
+ private static final Function<Throwable, BKException> EXCEPTION_FUNC =
cause -> {
+ if (cause instanceof BKException) {
+ log.error("Failed to get bookie list : ", cause);
+ return (BKException) cause;
+ } else if (cause instanceof InterruptedException) {
+ log.error("Interrupted reading bookie list : ", cause);
+ return new BKInterruptedException();
+ } else {
+ return new MetaStoreException();
+ }
+ };
+
+ private final ServerConfiguration conf;
+ private final ZooKeeper zk;
+ private final List<ACL> zkAcls;
+ private final LayoutManager layoutManager;
+
+ private volatile boolean zkRegManagerInitialized = false;
+
+ // ledgers root path
+ private final String ledgersRootPath;
+ // cookie path
+ private final String cookiePath;
+ // registration paths
+ protected final String bookieRegistrationPath;
+ protected final String bookieReadonlyRegistrationPath;
+ // session timeout in milliseconds
+ private final int zkTimeoutMs;
+ private final List<RegistrationListener> listeners = new ArrayList<>();
+ private Function<Void, Void> hookOnRegisterReadOnly;
+
+ public FaultInjectableZKRegistrationManager(ServerConfiguration conf,
+ ZooKeeper zk) {
+ this(conf, zk, ZKMetadataDriverBase.resolveZkLedgersRootPath(conf));
+ }
+
+ public FaultInjectableZKRegistrationManager(ServerConfiguration conf,
+ ZooKeeper zk,
+ String ledgersRootPath) {
+ this.conf = conf;
+ this.zk = zk;
+ this.zkAcls = ZkUtils.getACLs(conf);
+ this.ledgersRootPath = ledgersRootPath;
+ this.cookiePath = ledgersRootPath + "/" + COOKIE_NODE;
+ this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
+ this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath +
"/" + READONLY;
+ this.zkTimeoutMs = conf.getZkTimeout();
+
+ this.layoutManager = new ZkLayoutManager(
+ zk,
+ ledgersRootPath,
+ zkAcls);
+
+ this.zk.register(event -> {
+ if (!zkRegManagerInitialized) {
+ // do nothing until first registration
+ return;
+ }
+ // Check for expired connection.
+ if (event.getType().equals(EventType.None)
+ && event.getState().equals(KeeperState.Expired)) {
+ listeners.forEach(RegistrationListener::onRegistrationExpired);
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ /**
+ * Returns the CookiePath of the bookie in the ZooKeeper.
+ *
+ * @param bookieId bookie id
+ * @return
+ */
+ public String getCookiePath(BookieId bookieId) {
+ return this.cookiePath + "/" + bookieId;
+ }
+
+ //
+ // Registration Management
+ //
+
+ /**
+ * Check existence of <i>regPath</i> and wait it expired if possible.
+ *
+ * @param regPath reg node path.
+ * @return true if regPath exists, otherwise return false
+ * @throws IOException if can't create reg path
+ */
+ protected boolean checkRegNodeAndWaitExpired(String regPath) throws
IOException {
+ final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+ Watcher zkPrevRegNodewatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Check for prev znode deletion. Connection expiration is
+ // not handling, since bookie has logic to shutdown.
+ if (EventType.NodeDeleted == event.getType()) {
+ prevNodeLatch.countDown();
+ }
+ }
+ };
+ try {
+ Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
+ if (null != stat) {
+ // if the ephemeral owner isn't current zookeeper client
+ // wait for it to be expired.
+ if (stat.getEphemeralOwner() != zk.getSessionId()) {
+ log.info("Previous bookie registration znode: {} exists,
so waiting zk sessiontimeout:"
+ + " {} ms for znode deletion", regPath,
zkTimeoutMs);
+ // waiting for the previous bookie reg znode deletion
+ if (!prevNodeLatch.await(zkTimeoutMs,
TimeUnit.MILLISECONDS)) {
+ throw new NodeExistsException(regPath);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (KeeperException ke) {
+ log.error("ZK exception checking and wait ephemeral znode {}
expired : ", regPath, ke);
+ throw new IOException("ZK exception checking and wait ephemeral
znode "
+ + regPath + " expired", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted checking and wait ephemeral znode {}
expired : ", regPath, ie);
+ throw new IOException("Interrupted checking and wait ephemeral
znode "
+ + regPath + " expired", ie);
+ }
+ }
+
+ @Override
+ public void registerBookie(BookieId bookieId, boolean readOnly,
+ BookieServiceInfo bookieServiceInfo) throws
BookieException {
+ if (!readOnly) {
+ String regPath = bookieRegistrationPath + "/" + bookieId;
+ doRegisterBookie(regPath, bookieServiceInfo);
+ } else {
+ doRegisterReadOnlyBookie(bookieId, bookieServiceInfo);
+ }
+ }
+
+ @VisibleForTesting
+ static byte[] serializeBookieServiceInfo(BookieServiceInfo
bookieServiceInfo) {
+ if (log.isDebugEnabled()) {
+ log.debug("serialize BookieServiceInfo {}", bookieServiceInfo);
+ }
+ try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+ BookieServiceInfoFormat.Builder builder =
BookieServiceInfoFormat.newBuilder();
+ List<BookieServiceInfoFormat.Endpoint> bsiEndpoints =
bookieServiceInfo.getEndpoints().stream()
+ .map(e -> {
+ return BookieServiceInfoFormat.Endpoint.newBuilder()
+ .setId(e.getId())
+ .setPort(e.getPort())
+ .setHost(e.getHost())
+ .setProtocol(e.getProtocol())
+ .addAllAuth(e.getAuth())
+ .addAllExtensions(e.getExtensions())
+ .build();
+ })
+ .collect(Collectors.toList());
+
+ builder.addAllEndpoints(bsiEndpoints);
+ builder.putAllProperties(bookieServiceInfo.getProperties());
+
+ builder.build().writeTo(os);
+ return os.toByteArray();
+ } catch (IOException err) {
+ log.error("Cannot serialize bookieServiceInfo from " +
bookieServiceInfo);
+ throw new RuntimeException(err);
+ }
+ }
+
+ private void doRegisterBookie(String regPath, BookieServiceInfo
bookieServiceInfo) throws BookieException {
+ // ZK ephemeral node for this Bookie.
+ try {
+ if (!checkRegNodeAndWaitExpired(regPath)) {
+ // Create the ZK ephemeral node for this Bookie.
+ zk.create(regPath,
serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL);
+ zkRegManagerInitialized = true;
+ }
+ } catch (KeeperException ke) {
+ log.error("ZK exception registering ephemeral Znode for Bookie!",
ke);
+ // Throw an IOException back up. This will cause the Bookie
+ // constructor to error out. Alternatively, we could do a System
+ // exit here as this is a fatal error.
+ throw new MetadataStoreException(ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted exception registering ephemeral Znode for
Bookie!", ie);
+ // Throw an IOException back up. This will cause the Bookie
+ // constructor to error out. Alternatively, we could do a System
+ // exit here as this is a fatal error.
+ throw new MetadataStoreException(ie);
+ } catch (IOException e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ private void doRegisterReadOnlyBookie(BookieId bookieId, BookieServiceInfo
bookieServiceInfo)
+ throws BookieException {
+ try {
+ if (null == zk.exists(this.bookieReadonlyRegistrationPath, false))
{
+ try {
+ zk.create(this.bookieReadonlyRegistrationPath,
serializeBookieServiceInfo(bookieServiceInfo),
+ zkAcls, CreateMode.PERSISTENT);
+ } catch (NodeExistsException e) {
+ // this node is just now created by someone.
+ }
+ }
+ String regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+ doRegisterBookie(regPath, bookieServiceInfo);
+ // clear the write state
+ regPath = bookieRegistrationPath + "/" + bookieId;
+ try {
+ if (hookOnRegisterReadOnly != null) {
+ hookOnRegisterReadOnly.apply(null);
+ }
+ // Clear the current registered node
+ zk.delete(regPath, -1);
+ } catch (KeeperException.NoNodeException nne) {
+ log.warn("No writable bookie registered node {} when
transitioning to readonly",
+ regPath, nne);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ @Override
+ public void unregisterBookie(BookieId bookieId, boolean readOnly) throws
BookieException {
+ String regPath;
+ if (!readOnly) {
+ regPath = bookieRegistrationPath + "/" + bookieId;
+ } else {
+ regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+ }
+ doUnregisterBookie(regPath);
+ }
+
+ private void doUnregisterBookie(String regPath) throws BookieException {
+ try {
+ zk.delete(regPath, -1);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new MetadataStoreException(ie);
+ } catch (KeeperException e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ //
+ // Cookie Management
+ //
+
+ @Override
+ public void writeCookie(BookieId bookieId,
+ Versioned<byte[]> cookieData) throws
BookieException {
+ String zkPath = getCookiePath(bookieId);
+ try {
+ if (Version.NEW == cookieData.getVersion()) {
+ if (zk.exists(cookiePath, false) == null) {
+ try {
+ zk.create(cookiePath, new byte[0], zkAcls,
CreateMode.PERSISTENT);
+ } catch (NodeExistsException nne) {
+ log.info("More than one bookie tried to create {} at
once. Safe to ignore.",
+ cookiePath);
+ }
+ }
+ zk.create(zkPath, cookieData.getValue(), zkAcls,
CreateMode.PERSISTENT);
+ } else {
+ if (!(cookieData.getVersion() instanceof LongVersion)) {
+ throw new BookieIllegalOpException("Invalid version type,
expected it to be LongVersion");
+ }
+ zk.setData(
+ zkPath,
+ cookieData.getValue(),
+ (int) ((LongVersion)
cookieData.getVersion()).getLongVersion());
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new MetadataStoreException("Interrupted writing cookie for
bookie " + bookieId, ie);
+ } catch (NoNodeException nne) {
+ throw new CookieNotFoundException(bookieId.toString());
+ } catch (NodeExistsException nee) {
+ throw new CookieExistException(bookieId.toString());
+ } catch (KeeperException e) {
+ throw new MetadataStoreException("Failed to write cookie for
bookie " + bookieId);
+ }
+ }
+
+ @Override
+ public Versioned<byte[]> readCookie(BookieId bookieId) throws
BookieException {
+ String zkPath = getCookiePath(bookieId);
+ try {
+ Stat stat = zk.exists(zkPath, false);
+ byte[] data = zk.getData(zkPath, false, stat);
+ // sets stat version from ZooKeeper
+ LongVersion version = new LongVersion(stat.getVersion());
+ return new Versioned<>(data, version);
+ } catch (NoNodeException nne) {
+ throw new CookieNotFoundException(bookieId.toString());
+ } catch (KeeperException | InterruptedException e) {
+ throw new MetadataStoreException("Failed to read cookie for bookie
" + bookieId);
+ }
+ }
+
+ @Override
+ public void removeCookie(BookieId bookieId, Version version) throws
BookieException {
+ String zkPath = getCookiePath(bookieId);
+ try {
+ zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
+ } catch (NoNodeException e) {
+ throw new CookieNotFoundException(bookieId.toString());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new MetadataStoreException("Interrupted deleting cookie for
bookie " + bookieId, e);
+ } catch (KeeperException e) {
+ throw new MetadataStoreException("Failed to delete cookie for
bookie " + bookieId);
+ }
+
+ log.info("Removed cookie from {} for bookie {}.", cookiePath,
bookieId);
+ }
+
+
+ @Override
+ public String getClusterInstanceId() throws BookieException {
+ String instanceId = null;
+ try {
+ if (zk.exists(ledgersRootPath, null) == null) {
+ log.error("BookKeeper metadata doesn't exist in zookeeper. "
+ + "Has the cluster been initialized? "
+ + "Try running bin/bookkeeper shell metaformat");
+ throw new KeeperException.NoNodeException("BookKeeper
metadata");
+ }
+ try {
+ byte[] data = zk.getData(ledgersRootPath + "/"
+ + INSTANCEID, false, null);
+ instanceId = new String(data, UTF_8);
+ } catch (KeeperException.NoNodeException e) {
+ log.info("INSTANCEID not exists in zookeeper. Not considering
it for data verification");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new MetadataStoreException("Failed to get cluster instance
id", e);
+ }
+ return instanceId;
+ }
+
+ @Override
+ public boolean prepareFormat() throws Exception {
+ boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
+ boolean availableNodeExists = null !=
zk.exists(bookieRegistrationPath, false);
+ // Create ledgers root node if not exists
+ if (!ledgerRootExists) {
+ ZkUtils.createFullPathOptimistic(zk, ledgersRootPath,
"".getBytes(StandardCharsets.UTF_8), zkAcls,
+ CreateMode.PERSISTENT);
+ }
+ // create available bookies node if not exists
+ if (!availableNodeExists) {
+ zk.create(bookieRegistrationPath,
"".getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+ }
+
+ // create readonly bookies node if not exists
+ if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
+ zk.create(bookieReadonlyRegistrationPath, new byte[0], zkAcls,
CreateMode.PERSISTENT);
+ }
+
+ return ledgerRootExists;
+ }
+
+ @Override
+ public boolean initNewCluster() throws Exception {
+ String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
+ String instanceIdPath = ledgersRootPath + "/" + INSTANCEID;
+ log.info("Initializing ZooKeeper metadata for new cluster, ZKServers:
{} ledger root path: {}", zkServers,
+ ledgersRootPath);
+
+ boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
+
+ if (ledgerRootExists) {
+ log.error("Ledger root path: {} already exists", ledgersRootPath);
+ return false;
+ }
+
+ List<Op> multiOps = Lists.newArrayListWithExpectedSize(4);
+
+ // Create ledgers root node
+ multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls,
CreateMode.PERSISTENT));
+
+ // create available bookies node
+ multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY,
zkAcls, CreateMode.PERSISTENT));
+
+ // create readonly bookies node
+ multiOps.add(Op.create(
+ bookieReadonlyRegistrationPath,
+ EMPTY_BYTE_ARRAY,
+ zkAcls,
+ CreateMode.PERSISTENT));
+
+ // create INSTANCEID
+ String instanceId = UUID.randomUUID().toString();
+ multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8),
+ zkAcls, CreateMode.PERSISTENT));
+
+ // execute the multi ops
+ zk.multi(multiOps);
+
+ // creates the new layout and stores in zookeeper
+ AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf,
layoutManager);
+
+ log.info("Successfully initiated cluster. ZKServers: {} ledger root
path: {} instanceId: {}", zkServers,
+ ledgersRootPath, instanceId);
+ return true;
+ }
+
+ @Override
+ public boolean nukeExistingCluster() throws Exception {
+ String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
+ log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {}
ledger root path: {}",
+ zkServers, ledgersRootPath);
+
+ boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
+ if (!ledgerRootExists) {
+ log.info("There is no existing cluster with ledgersRootPath: {} in
ZKServers: {}, "
+ + "so exiting nuke operation", ledgersRootPath, zkServers);
+ return true;
+ }
+
+ boolean availableNodeExists = null !=
zk.exists(bookieRegistrationPath, false);
+ try (RegistrationClient regClient = new ZKRegistrationClient(
+ zk,
+ ledgersRootPath,
+ null,
+ false
+ )) {
+ if (availableNodeExists) {
+ Collection<BookieId> rwBookies = FutureUtils
+ .result(regClient.getWritableBookies(),
EXCEPTION_FUNC).getValue();
+ if (rwBookies != null && !rwBookies.isEmpty()) {
+ log.error("Bookies are still up and connected to this
cluster, "
+ + "stop all bookies before nuking the cluster");
+ return false;
+ }
+
+ boolean readonlyNodeExists = null !=
zk.exists(bookieReadonlyRegistrationPath, false);
+ if (readonlyNodeExists) {
+ Collection<BookieId> roBookies = FutureUtils
+ .result(regClient.getReadOnlyBookies(),
EXCEPTION_FUNC).getValue();
+ if (roBookies != null && !roBookies.isEmpty()) {
+ log.error("Readonly Bookies are still up and connected
to this cluster, "
+ + "stop all bookies before nuking the
cluster");
+ return false;
+ }
+ }
+ }
+ }
+
+ LedgerManagerFactory ledgerManagerFactory =
+ AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf,
layoutManager);
+ return ledgerManagerFactory.validateAndNukeExistingCluster(conf,
layoutManager);
+ }
+
+ @Override
+ public boolean format() throws Exception {
+ // Clear underreplicated ledgers
+ try {
+ ZKUtil.deleteRecursive(zk,
ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath)
+ + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("underreplicated ledgers root path node not exists
in zookeeper to delete");
+ }
+ }
+
+ // Clear underreplicatedledger locks
+ try {
+ ZKUtil.deleteRecursive(zk,
ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + '/'
+ + BookKeeperConstants.UNDER_REPLICATION_LOCK);
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("underreplicatedledger locks node not exists in
zookeeper to delete");
+ }
+ }
+
+ // Clear the cookies
+ try {
+ ZKUtil.deleteRecursive(zk, cookiePath);
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("cookies node not exists in zookeeper to delete");
+ }
+ }
+
+ // Clear the INSTANCEID
+ try {
+ zk.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID,
-1);
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("INSTANCEID not exists in zookeeper to delete");
+ }
+ }
+
+ // create INSTANCEID
+ String instanceId = UUID.randomUUID().toString();
+ zk.create(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID,
+ instanceId.getBytes(StandardCharsets.UTF_8), zkAcls,
CreateMode.PERSISTENT);
+
+ log.info("Successfully formatted BookKeeper metadata");
+ return true;
+ }
+
+ @Override
+ public boolean isBookieRegistered(BookieId bookieId) throws
BookieException {
+ String regPath = bookieRegistrationPath + "/" + bookieId;
+ String readonlyRegPath = bookieReadonlyRegistrationPath + "/" +
bookieId;
+ try {
+ return ((null != zk.exists(regPath, false)) || (null !=
zk.exists(readonlyRegPath, false)));
+ } catch (KeeperException e) {
+ log.error("ZK exception while checking registration ephemeral
znodes for BookieId: {}", bookieId, e);
+ throw new MetadataStoreException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("InterruptedException while checking registration
ephemeral znodes for BookieId: {}", bookieId,
+ e);
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ @Override
+ public void addRegistrationListener(RegistrationListener listener) {
+ listeners.add(listener);
+ }
+
+ public void betweenRegisterReadOnlyBookie(Function<Void, Void> fn) {
+ hookOnRegisterReadOnly = fn;
+ }
+}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 409a6724381..c4f56915119 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -43,9 +43,8 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
-import org.apache.bookkeeper.discover.RegistrationClient;
-import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.*;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.versioning.Version;
@@ -53,6 +52,7 @@ import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
@@ -169,10 +169,10 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
getAndVerifyAllBookies(rc, addresses);
Awaitility.await().untilAsserted(() -> {
- for (BookieId address : addresses) {
- BookieServiceInfo bookieServiceInfo =
rc.getBookieServiceInfo(address).get().getValue();
- compareBookieServiceInfo(bookieServiceInfo,
bookieServiceInfos.get(address));
- }});
+ for (BookieId address : addresses) {
+ BookieServiceInfo bookieServiceInfo =
rc.getBookieServiceInfo(address).get().getValue();
+ compareBookieServiceInfo(bookieServiceInfo,
bookieServiceInfos.get(address));
+ }});
// shutdown the bookies (but keep the cookie)
for (BookieId address : addresses) {
@@ -185,12 +185,12 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
// getBookieServiceInfo should fail with
BKBookieHandleNotAvailableException
Awaitility.await().untilAsserted(() -> {
- for (BookieId address : addresses) {
- assertTrue(
- expectThrows(ExecutionException.class, () -> {
- rc.getBookieServiceInfo(address).get();
- }).getCause() instanceof
BKException.BKBookieHandleNotAvailableException);
- }});
+ for (BookieId address : addresses) {
+ assertTrue(
+ expectThrows(ExecutionException.class, () -> {
+ rc.getBookieServiceInfo(address).get();
+ }).getCause() instanceof
BKException.BKBookieHandleNotAvailableException);
+ }});
// restart the bookies, all writable
@@ -242,12 +242,12 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
.await()
.ignoreExceptionsMatching(e -> e.getCause() instanceof
BKException.BKBookieHandleNotAvailableException)
.untilAsserted(() -> {
- // verify that infos are updated
- for (BookieId address : addresses) {
- BookieServiceInfo bookieServiceInfo =
rc.getBookieServiceInfo(address).get().getValue();
- compareBookieServiceInfo(bookieServiceInfo,
bookieServiceInfos.get(address));
- }
- });
+ // verify that infos are updated
+ for (BookieId address : addresses) {
+ BookieServiceInfo bookieServiceInfo =
rc.getBookieServiceInfo(address).get().getValue();
+ compareBookieServiceInfo(bookieServiceInfo,
bookieServiceInfos.get(address));
+ }
+ });
}
@@ -358,4 +358,88 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
});
}
+
+ @Test
+ public void testNetworkDelayWithBkZkManager() throws Throwable {
+ final String zksConnectionString = zks.getConnectionString();
+ final String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
+ // prepare registration manager
+ ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null);
+ final ServerConfiguration serverConfiguration = new
ServerConfiguration();
+ serverConfiguration.setZkLedgersRootPath(ledgersRoot);
+ final FaultInjectableZKRegistrationManager rm = new
FaultInjectableZKRegistrationManager(serverConfiguration, zk);
+ rm.prepareFormat();
+ // prepare registration client
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(zksConnectionString,
+ MetadataStoreConfig.builder().build());
+ @Cleanup
+ RegistrationClient rc1 = new PulsarRegistrationClient(store,
ledgersRoot);
+ @Cleanup
+ RegistrationClient rc2 = new PulsarRegistrationClient(store,
ledgersRoot);
+
+ final List<BookieId> addresses = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ addresses.add(BookieId.parse("BOOKIE-" + i));
+ }
+ final Map<BookieId, BookieServiceInfo> bookieServiceInfos = new
HashMap<>();
+
+ int port = 223;
+ for (BookieId address : addresses) {
+ BookieServiceInfo info = new BookieServiceInfo();
+ BookieServiceInfo.Endpoint endpoint = new
BookieServiceInfo.Endpoint();
+ endpoint.setAuth(Collections.emptyList());
+ endpoint.setExtensions(Collections.emptyList());
+ endpoint.setId("id");
+ endpoint.setHost("localhost");
+ endpoint.setPort(port++);
+ endpoint.setProtocol("bookie-rpc");
+ info.setEndpoints(Arrays.asList(endpoint));
+ bookieServiceInfos.put(address, info);
+ rm.registerBookie(address, false, info);
+ // write the cookie
+ rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW));
+ }
+
+ // trigger loading the BookieServiceInfo in the local cache
+ getAndVerifyAllBookies(rc1, addresses);
+ getAndVerifyAllBookies(rc2, addresses);
+
+ Awaitility.await().untilAsserted(() -> {
+ for (BookieId address : addresses) {
+
compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(),
+ bookieServiceInfos.get(address));
+
compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(),
+ bookieServiceInfos.get(address));
+ }
+ });
+
+ // verified the init status.
+
+
+ // mock network delay
+ rm.betweenRegisterReadOnlyBookie(__ -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ for (int i = 0; i < addresses.size() / 2; i++) {
+ final BookieId bkId = addresses.get(i);
+ // turn some bookies to be read only.
+ rm.registerBookie(bkId, true, bookieServiceInfos.get(bkId));
+ }
+
+ Awaitility.await().untilAsserted(() -> {
+ for (BookieId address : addresses) {
+
compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(),
+ bookieServiceInfos.get(address));
+
compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(),
+ bookieServiceInfos.get(address));
+ }
+ });
+ }
}