This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cbeff98d513564418f4d800d04208d4206335c4e Author: Qiang Zhao <[email protected]> AuthorDate: Mon Aug 21 09:42:05 2023 +0800 [improve][meta] Improve fault tolerance of blocking calls by supporting timeout (#21028) (cherry picked from commit 976a58061fd87d577b7903622ed2e61f4bec7d22) --- .../bookkeeper/AbstractMetadataDriver.java | 2 + .../LegacyHierarchicalLedgerRangeIterator.java | 18 ++- .../LongHierarchicalLedgerRangeIterator.java | 7 +- .../metadata/bookkeeper/PulsarLayoutManager.java | 17 ++- .../bookkeeper/PulsarLedgerManagerFactory.java | 38 ++++++- .../PulsarLedgerUnderreplicationManager.java | 122 +++++++++++++-------- .../bookkeeper/PulsarRegistrationManager.java | 112 +++++++++++-------- 7 files changed, 208 insertions(+), 108 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java index cc5f759c73f..435f94b05dc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java @@ -21,6 +21,7 @@ package org.apache.pulsar.metadata.bookkeeper; import java.io.Closeable; import java.io.IOException; import java.net.URI; +import java.util.concurrent.TimeUnit; import lombok.SneakyThrows; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.discover.RegistrationClient; @@ -40,6 +41,7 @@ public abstract class AbstractMetadataDriver implements Closeable { public static final String METADATA_STORE_SCHEME = "metadata-store"; public static final String METADATA_STORE_INSTANCE = "metadata-store-instance"; + public static final long BLOCKING_CALL_TIMEOUT = TimeUnit.SECONDS.toMillis(30); protected MetadataStoreExtended store; private boolean storeInstanceIsOwned; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java index 15b1d561f90..37e6dc836f2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java @@ -18,17 +18,21 @@ */ package org.apache.pulsar.metadata.bookkeeper; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.NoSuchElementException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.util.StringUtils; import org.apache.pulsar.metadata.api.MetadataStore; + /** * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes. * @@ -67,7 +71,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg * @return false if have visited all level1 nodes * @throws InterruptedException/KeeperException if error occurs reading zookeeper children */ - private boolean nextL1Node() throws ExecutionException, InterruptedException { + private boolean nextL1Node() throws ExecutionException, InterruptedException, TimeoutException { l2NodesIter = null; while (l2NodesIter == null) { if (l1NodesIter.hasNext()) { @@ -79,7 +83,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg if (!isLedgerParentNode(curL1Nodes)) { continue; } - List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + curL1Nodes).get(); + List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + curL1Nodes) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); l2NodesIter = l2Nodes.iterator(); if (!l2NodesIter.hasNext()) { l2NodesIter = null; @@ -94,7 +99,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg boolean hasMoreElements = false; try { if (l1NodesIter == null) { - List<String> l1Nodes = store.getChildren(ledgersRoot).get(); + List<String> l1Nodes = store.getChildren(ledgersRoot) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); l1NodesIter = l1Nodes.iterator(); hasMoreElements = nextL1Node(); } else if (l2NodesIter == null || !l2NodesIter.hasNext()) { @@ -102,7 +108,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg } else { hasMoreElements = true; } - } catch (ExecutionException ke) { + } catch (ExecutionException | TimeoutException ke) { throw new IOException("Error preloading next range", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -156,8 +162,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg String nodePath = nodeBuilder.toString(); List<String> ledgerNodes = null; try { - ledgerNodes = store.getChildren(nodePath).get(); - } catch (ExecutionException e) { + ledgerNodes = store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { throw new IOException("Error when get child nodes from zk", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java index 9a36ac53b89..3b32916e6e7 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.util.StringUtils; @@ -57,8 +59,9 @@ class LongHierarchicalLedgerRangeIterator implements LedgerManager.LedgerRangeIt */ List<String> getChildrenAt(String path) throws IOException { try { - return store.getChildren(path).get(); - } catch (ExecutionException e) { + return store.getChildren(path) + .get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { if (log.isDebugEnabled()) { log.debug("Failed to get children at {}", path); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java index ee06930b3c8..99dc474f5cd 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java @@ -18,9 +18,12 @@ */ package org.apache.pulsar.metadata.bookkeeper; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import java.io.IOException; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import lombok.AccessLevel; import lombok.Getter; import org.apache.bookkeeper.bookie.BookieException; @@ -49,14 +52,14 @@ public class PulsarLayoutManager implements LayoutManager { @Override public LedgerLayout readLedgerLayout() throws IOException { try { - byte[] layoutData = store.get(layoutPath).get() + byte[] layoutData = store.get(layoutPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS) .orElseThrow(() -> new BookieException.MetadataStoreException("Layout node not found")) .getValue(); return LedgerLayout.parseLayout(layoutData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); - } catch (BookieException | ExecutionException e) { + } catch (BookieException | ExecutionException | TimeoutException e) { throw new IOException(e); } } @@ -66,10 +69,13 @@ public class PulsarLayoutManager implements LayoutManager { try { byte[] layoutData = ledgerLayout.serialize(); - store.put(layoutPath, layoutData, Optional.of(-1L)).get(); + store.put(layoutPath, layoutData, Optional.of(-1L)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); + } catch (TimeoutException e) { + throw new IOException(e); } catch (ExecutionException e) { if (e.getCause() instanceof MetadataStoreException.BadVersionException) { throw new LedgerLayoutExistsException(e); @@ -82,11 +88,12 @@ public class PulsarLayoutManager implements LayoutManager { @Override public void deleteLedgerLayout() throws IOException { try { - store.delete(layoutPath, Optional.empty()).get(); + store.delete(layoutPath, Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); - } catch (ExecutionException e) { + } catch (ExecutionException | TimeoutException e) { throw new IOException(e); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java index 1b229757c9c..bfcbf0b22d9 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java @@ -19,8 +19,12 @@ package org.apache.pulsar.metadata.bookkeeper; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.AbstractConfiguration; @@ -110,7 +114,13 @@ public class PulsarLedgerManagerFactory implements LedgerManagerFactory { * before proceeding with nuking existing cluster, make sure there * are no unexpected nodes under ledgersRootPath */ - List<String> ledgersRootPathChildrenList = store.getChildren(ledgerRootPath).join(); + final List<String> ledgersRootPathChildrenList; + try { + ledgersRootPathChildrenList = store.getChildren(ledgerRootPath) + .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + throw new IOException(e); + } for (String ledgersRootPathChildren : ledgersRootPathChildrenList) { if ((!AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) && (!ledgerManager.isLedgerParentNode(ledgersRootPathChildren))) { @@ -124,18 +134,34 @@ public class PulsarLedgerManagerFactory implements LedgerManagerFactory { format(conf, layoutManager); // now delete all the special nodes recursively - for (String ledgersRootPathChildren : store.getChildren(ledgerRootPath).join()) { - if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) { - store.deleteRecursive(ledgerRootPath + "/" + ledgersRootPathChildren).join(); + final List<String> ledgersRootPathChildren; + try { + ledgersRootPathChildren = store.getChildren(ledgerRootPath) + .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + throw new IOException(e); + } + for (String ledgersRootPathChild :ledgersRootPathChildren) { + if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChild)) { + try { + store.deleteRecursive(ledgerRootPath + "/" + ledgersRootPathChild) + .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + throw new IOException(e); + } } else { log.error("Found unexpected node : {} under ledgersRootPath : {} so exiting nuke operation", - ledgersRootPathChildren, ledgerRootPath); + ledgersRootPathChild, ledgerRootPath); return false; } } // finally deleting the ledgers rootpath - store.deleteRecursive(ledgerRootPath).join(); + try { + store.deleteRecursive(ledgerRootPath).get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + throw new IOException(e); + } log.info("Successfully nuked existing cluster"); return true; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index 343c3165ec7..b7cde77bdc2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -19,12 +19,14 @@ package org.apache.pulsar.metadata.bookkeeper; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat; import static org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat; import static org.apache.bookkeeper.proto.DataFormats.LockDataFormat; import static org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; +import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import com.google.common.base.Joiner; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; @@ -43,6 +45,7 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -274,7 +277,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati try { String path = getUrLedgerPath(ledgerId); - Optional<GetResult> optRes = store.get(path).get(); + Optional<GetResult> optRes = store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (!optRes.isPresent()) { if (log.isDebugEnabled()) { log.debug("Ledger: {} is not marked underreplicated", ledgerId); @@ -295,7 +298,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati underreplicatedLedger.setCtime(ctime); underreplicatedLedger.setReplicaList(replicaList); return underreplicatedLedger; - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { throw new ReplicationException.UnavailableException("Error contacting with metadata store", ee); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -399,14 +402,16 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati public void acquireUnderreplicatedLedger(long ledgerId) throws ReplicationException { try { internalAcquireUnderreplicatedLedger(ledgerId); - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException | TimeoutException | InterruptedException e) { throw new ReplicationException.UnavailableException("Failed to acuire under-replicated ledger", e); } } - private void internalAcquireUnderreplicatedLedger(long ledgerId) throws ExecutionException, InterruptedException { + private void internalAcquireUnderreplicatedLedger(long ledgerId) throws ExecutionException, + InterruptedException, TimeoutException { String lockPath = getUrLedgerLockPath(urLockPath, ledgerId); - store.put(lockPath, LOCK_DATA, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).get(); + store.put(lockPath, LOCK_DATA, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } @Override @@ -417,7 +422,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati try { Lock l = heldLocks.get(ledgerId); if (l != null) { - store.delete(getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())).get(); + store.delete(getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (store instanceof ZKMetadataStore) { try { // clean up the hierarchy @@ -455,6 +461,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.error("Error deleting underreplicated ledger node", ee); throw new ReplicationException.UnavailableException("Error contacting metadata store", ee); } + } catch (TimeoutException ex) { + throw new ReplicationException.UnavailableException("Error contacting metadata store", ex); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting metadata store", ie); @@ -495,7 +503,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati while (queue.size() > 0 && curBatch.size() == 0) { String parent = queue.remove(); try { - for (String c : store.getChildren(parent).get()) { + for (String c : store.getChildren(parent).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { String child = parent + "/" + c; if (c.startsWith("urL")) { long ledgerId = getLedgerId(child); @@ -529,21 +537,23 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati } private long getLedgerToRereplicateFromHierarchy(String parent, long depth) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException, TimeoutException { if (depth == 4) { - List<String> children = new ArrayList<>(store.getChildren(parent).get()); + List<String> children = new ArrayList<>(store.getChildren(parent) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)); Collections.shuffle(children); while (!children.isEmpty()) { String tryChild = children.get(0); try { - List<String> locks = store.getChildren(urLockPath).get(); + List<String> locks = store.getChildren(urLockPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (locks.contains(tryChild)) { children.remove(tryChild); continue; } - Optional<GetResult> optRes = store.get(parent + "/" + tryChild).get(); + Optional<GetResult> optRes = store.get(parent + "/" + tryChild) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (!optRes.isPresent()) { if (log.isDebugEnabled()) { log.debug("{}/{} doesn't exist", parent, tryChild); @@ -572,7 +582,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati return -1; } - List<String> children = new ArrayList<>(store.getChildren(parent).join()); + List<String> children = new ArrayList<>(store.getChildren(parent).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)); Collections.shuffle(children); while (children.size() > 0) { @@ -595,7 +605,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati } try { return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0); - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { throw new ReplicationException.UnavailableException("Error contacting metadata store", ee); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -621,7 +631,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati // nothing found, wait for a watcher to trigger this.wait(1000); } - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { throw new ReplicationException.UnavailableException("Error contacting metadata store", ee); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -647,7 +657,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati try { Lock l = heldLocks.get(ledgerId); if (l != null) { - store.delete(l.getLockPath(), Optional.empty()).get(); + store.delete(l.getLockPath(), Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } } catch (ExecutionException ee) { if (ee.getCause() instanceof MetadataStoreException.NotFoundException) { @@ -656,6 +667,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.error("Error deleting underreplicated ledger lock", ee); throw new ReplicationException.UnavailableException("Error contacting metadata store", ee); } + } catch (TimeoutException ex) { + throw new ReplicationException.UnavailableException("Error contacting metadata store", ex); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie); @@ -670,7 +683,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati } try { for (Map.Entry<Long, Lock> e : heldLocks.entrySet()) { - store.delete(e.getValue().getLockPath(), Optional.empty()).get(); + store.delete(e.getValue().getLockPath(), Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } } catch (ExecutionException ee) { if (ee.getCause() instanceof MetadataStoreException.NotFoundException) { @@ -679,6 +693,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.error("Error deleting underreplicated ledger lock", ee); throw new ReplicationException.UnavailableException("Error contacting metadata store", ee); } + } catch (TimeoutException ex) { + throw new ReplicationException.UnavailableException("Error contacting metadata store", ex); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting metadata store", ie); @@ -692,9 +708,10 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("disableLedegerReplication()"); } try { - store.put(replicationDisablePath, "".getBytes(UTF_8), Optional.of(-1L)).get(); + store.put(replicationDisablePath, "".getBytes(UTF_8), Optional.of(-1L)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); log.info("Auto ledger re-replication is disabled!"); - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { log.error("Exception while stopping auto ledger re-replication", ee); throw new ReplicationException.UnavailableException( "Exception while stopping auto ledger re-replication", ee); @@ -712,9 +729,10 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("enableLedegerReplication()"); } try { - store.delete(replicationDisablePath, Optional.empty()).get(); + store.delete(replicationDisablePath, Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); log.info("Resuming automatic ledger re-replication"); - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { log.error("Exception while resuming ledger replication", ee); throw new ReplicationException.UnavailableException( "Exception while resuming auto ledger re-replication", ee); @@ -732,8 +750,9 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("isLedgerReplicationEnabled()"); } try { - return !store.exists(replicationDisablePath).get(); - } catch (ExecutionException ee) { + return !store.exists(replicationDisablePath) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + } catch (ExecutionException | TimeoutException ee) { log.error("Error while checking the state of " + "ledger re-replication", ee); throw new ReplicationException.UnavailableException( @@ -755,13 +774,14 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati replicationEnabledCallbacks.add(cb); } try { - if (!store.exists(replicationDisablePath).get()) { + if (!store.exists(replicationDisablePath) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { log.info("LedgerReplication is enabled externally through metadata store, " + "since DISABLE_NODE node is deleted"); cb.operationComplete(0, null); return; } - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { log.error("Error while checking the state of " + "ledger re-replication", ee); throw new ReplicationException.UnavailableException( @@ -779,7 +799,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati @Override public boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException { try { - return store.exists(getUrLedgerLockPath(urLockPath, ledgerId)).get(); + return store.exists(getUrLedgerLockPath(urLockPath, ledgerId)).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (Exception e) { throw new ReplicationException.UnavailableException("Failed to check if ledger is beinge replicated", e); } @@ -791,7 +811,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("initializeLostBookieRecoveryDelay()"); try { store.put(lostBookieRecoveryDelayPath, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8), - Optional.of(-1L)).get(); + Optional.of(-1L)).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (ExecutionException ee) { if (ee.getCause() instanceof MetadataStoreException.BadVersionException) { log.info("lostBookieRecoveryDelay node is already present, so using " @@ -801,6 +821,9 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.error("Error while initializing LostBookieRecoveryDelay", ee); throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } + } catch (TimeoutException ex) { + log.error("Error while initializing LostBookieRecoveryDelay", ex); + throw new ReplicationException.UnavailableException("Error contacting zookeeper", ex); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -814,9 +837,9 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("setLostBookieRecoveryDelay()"); try { store.put(lostBookieRecoveryDelayPath, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8), - Optional.empty()).get(); + Optional.empty()).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { log.error("Error while setting LostBookieRecoveryDelay ", ee); throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } catch (InterruptedException ie) { @@ -829,9 +852,10 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException { log.debug("getLostBookieRecoveryDelay()"); try { - byte[] data = store.get(lostBookieRecoveryDelayPath).get().get().getValue(); + byte[] data = store.get(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS) + .get().getValue(); return Integer.parseInt(new String(data, UTF_8)); - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { log.error("Error while getting LostBookieRecoveryDelay ", ee); throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } catch (InterruptedException ie) { @@ -848,12 +872,12 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati lostBookieRecoveryDelayCallbacks.add(cb); } try { - if (!store.exists(lostBookieRecoveryDelayPath).get()) { + if (!store.exists(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { cb.operationComplete(0, null); return; } - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { log.error("Error while checking the state of lostBookieRecoveryDelay", ee); throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } catch (InterruptedException ie) { @@ -867,7 +891,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati throws ReplicationException.UnavailableException { try { - Optional<GetResult> optRes = store.get(getUrLedgerLockPath(urLockPath, ledgerId)).get(); + Optional<GetResult> optRes = store.get(getUrLedgerLockPath(urLockPath, ledgerId)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (!optRes.isPresent()) { // this is ok. return null; @@ -878,7 +903,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati TextFormat.merge(new String(lockData, UTF_8), lockDataBuilder); LockDataFormat lock = lockDataBuilder.build(); return lock.getBookieId(); - } catch (ExecutionException e) { + } catch (ExecutionException | TimeoutException e) { log.error("Error while getting ReplicationWorkerId rereplicating Ledger", e); throw new ReplicationException.UnavailableException( "Error while getting ReplicationWorkerId rereplicating Ledger", e); @@ -902,8 +927,9 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati builder.setCheckAllLedgersCTime(checkAllLedgersCTime); byte[] checkAllLedgersFormatByteArray = builder.build().toByteArray(); - store.put(checkAllLedgersCtimePath, checkAllLedgersFormatByteArray, Optional.empty()).get(); - } catch (ExecutionException ee) { + store.put(checkAllLedgersCtimePath, checkAllLedgersFormatByteArray, Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + } catch (ExecutionException | TimeoutException ee) { throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -917,7 +943,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("setCheckAllLedgersCTime"); } try { - Optional<GetResult> optRes = store.get(checkAllLedgersCtimePath).get(); + Optional<GetResult> optRes = store.get(checkAllLedgersCtimePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (!optRes.isPresent()) { log.warn("checkAllLedgersCtimeZnode is not yet available"); return -1; @@ -926,7 +952,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati CheckAllLedgersFormat checkAllLedgersFormat = CheckAllLedgersFormat.parseFrom(data); return checkAllLedgersFormat.hasCheckAllLedgersCTime() ? checkAllLedgersFormat.getCheckAllLedgersCTime() : -1; - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -946,8 +972,9 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati PlacementPolicyCheckFormat.Builder builder = PlacementPolicyCheckFormat.newBuilder(); builder.setPlacementPolicyCheckCTime(placementPolicyCheckCTime); byte[] placementPolicyCheckFormatByteArray = builder.build().toByteArray(); - store.put(placementPolicyCheckCtimePath, placementPolicyCheckFormatByteArray, Optional.empty()).get(); - } catch (ExecutionException ke) { + store.put(placementPolicyCheckCtimePath, placementPolicyCheckFormatByteArray, Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + } catch (ExecutionException | TimeoutException ke) { throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -961,7 +988,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("getPlacementPolicyCheckCTime"); } try { - Optional<GetResult> optRes = store.get(placementPolicyCheckCtimePath).get(); + Optional<GetResult> optRes = store.get(placementPolicyCheckCtimePath) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (!optRes.isPresent()) { log.warn("placementPolicyCheckCtimeZnode is not yet available"); return -1; @@ -970,7 +998,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati PlacementPolicyCheckFormat placementPolicyCheckFormat = PlacementPolicyCheckFormat.parseFrom(data); return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime() ? placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1; - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -986,11 +1014,12 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati ReplicasCheckFormat.Builder builder = ReplicasCheckFormat.newBuilder(); builder.setReplicasCheckCTime(replicasCheckCTime); byte[] replicasCheckFormatByteArray = builder.build().toByteArray(); - store.put(replicasCheckCtimePath, replicasCheckFormatByteArray, Optional.empty()).get(); + store.put(replicasCheckCtimePath, replicasCheckFormatByteArray, Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (log.isDebugEnabled()) { log.debug("setReplicasCheckCTime completed successfully"); } - } catch (ExecutionException ke) { + } catch (ExecutionException | TimeoutException ke) { throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1001,7 +1030,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati @Override public long getReplicasCheckCTime() throws ReplicationException.UnavailableException { try { - Optional<GetResult> optRes = store.get(replicasCheckCtimePath).get(); + Optional<GetResult> optRes = store.get(replicasCheckCtimePath) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (!optRes.isPresent()) { log.warn("placementPolicyCheckCtimeZnode is not yet available"); return -1; @@ -1012,7 +1042,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati log.debug("getReplicasCheckCTime completed successfully"); } return replicasCheckFormat.hasReplicasCheckCTime() ? replicasCheckFormat.getReplicasCheckCTime() : -1; - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { throw new ReplicationException.UnavailableException("Error contacting zookeeper", ee); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java index 25c3f10aa18..c6aba6b7d93 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java @@ -19,10 +19,12 @@ package org.apache.pulsar.metadata.bookkeeper; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID; import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; +import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -32,8 +34,8 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import lombok.Cleanup; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.conf.AbstractConfiguration; @@ -85,12 +87,11 @@ public class PulsarRegistrationManager implements RegistrationManager { } @Override - @SneakyThrows public void close() { for (ResourceLock<BookieServiceInfo> rwBookie : bookieRegistration.values()) { try { - rwBookie.release().get(); - } catch (ExecutionException ignore) { + rwBookie.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + } catch (ExecutionException | TimeoutException ignore) { log.error("Cannot release correctly {}", rwBookie, ignore.getCause()); } catch (InterruptedException ignore) { log.error("Cannot release correctly {}", rwBookie, ignore); @@ -100,26 +101,30 @@ public class PulsarRegistrationManager implements RegistrationManager { for (ResourceLock<BookieServiceInfo> roBookie : bookieRegistrationReadOnly.values()) { try { - roBookie.release().get(); - } catch (ExecutionException ignore) { + roBookie.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + } catch (ExecutionException | TimeoutException ignore) { log.error("Cannot release correctly {}", roBookie, ignore.getCause()); } catch (InterruptedException ignore) { log.error("Cannot release correctly {}", roBookie, ignore); Thread.currentThread().interrupt(); } } - coordinationService.close(); + try { + coordinationService.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override public String getClusterInstanceId() throws BookieException { try { return store.get(ledgersRootPath + "/" + INSTANCEID) - .get() + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS) .map(res -> new String(res.getValue(), UTF_8)) .orElseThrow( () -> new BookieException.MetadataStoreException("BookKeeper cluster not initialized")); - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException | InterruptedException | TimeoutException e) { throw new BookieException.MetadataStoreException("Failed to get cluster instance id", e); } } @@ -136,22 +141,24 @@ public class PulsarRegistrationManager implements RegistrationManager { ResourceLock<BookieServiceInfo> rwRegistration = bookieRegistration.remove(bookieId); if (rwRegistration != null) { log.info("Bookie {} was already registered as writable, unregistering", bookieId); - rwRegistration.release().get(); + rwRegistration.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } bookieRegistrationReadOnly.put(bookieId, - lockManager.acquireLock(regPathReadOnly, bookieServiceInfo).get()); + lockManager.acquireLock(regPathReadOnly, bookieServiceInfo) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)); } else { ResourceLock<BookieServiceInfo> roRegistration = bookieRegistrationReadOnly.remove(bookieId); if (roRegistration != null) { log.info("Bookie {} was already registered as read-only, unregistering", bookieId); - roRegistration.release().get(); + roRegistration.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } bookieRegistration.put(bookieId, - lockManager.acquireLock(regPath, bookieServiceInfo).get()); + lockManager.acquireLock(regPath, bookieServiceInfo) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)); } - } catch (ExecutionException ee) { + } catch (ExecutionException | TimeoutException ee) { log.error("Exception registering ephemeral node for Bookie!", ee); // Throw an IOException back up. This will cause the Bookie // constructor to error out. Alternatively, we could do a System @@ -173,18 +180,18 @@ public class PulsarRegistrationManager implements RegistrationManager { if (readOnly) { ResourceLock<BookieServiceInfo> roRegistration = bookieRegistrationReadOnly.get(bookieId); if (roRegistration != null) { - roRegistration.release().get(); + roRegistration.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } } else { ResourceLock<BookieServiceInfo> rwRegistration = bookieRegistration.get(bookieId); if (rwRegistration != null) { - rwRegistration.release().get(); + rwRegistration.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new BookieException.MetadataStoreException(ie); - } catch (ExecutionException e) { + } catch (ExecutionException | TimeoutException e) { throw new BookieException.MetadataStoreException(e); } } @@ -195,8 +202,9 @@ public class PulsarRegistrationManager implements RegistrationManager { String readonlyRegPath = bookieReadonlyRegistrationPath + "/" + bookieId; try { - return (store.exists(regPath).get() || store.exists(readonlyRegPath).get()); - } catch (ExecutionException e) { + return (store.exists(regPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS) + || store.exists(readonlyRegPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)); + } catch (ExecutionException | TimeoutException e) { log.error("Exception while checking registration ephemeral nodes for BookieId: {}", bookieId, e); throw new BookieException.MetadataStoreException(e); } catch (InterruptedException e) { @@ -222,7 +230,8 @@ public class PulsarRegistrationManager implements RegistrationManager { version = ((LongVersion) cookieData.getVersion()).getLongVersion(); } - store.put(path, cookieData.getValue(), Optional.of(version)).get(); + store.put(path, cookieData.getValue(), Optional.of(version)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new BookieException.MetadataStoreException("Interrupted writing cookie for bookie " + bookieId, ie); @@ -232,6 +241,8 @@ public class PulsarRegistrationManager implements RegistrationManager { } else { throw new BookieException.MetadataStoreException("Failed to write cookie for bookie " + bookieId); } + } catch (TimeoutException ex) { + throw new BookieException.MetadataStoreException("Failed to write cookie for bookie " + bookieId, ex); } } @@ -239,7 +250,7 @@ public class PulsarRegistrationManager implements RegistrationManager { public Versioned<byte[]> readCookie(BookieId bookieId) throws BookieException { String path = this.cookiePath + "/" + bookieId; try { - Optional<GetResult> res = store.get(path).get(); + Optional<GetResult> res = store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); if (!res.isPresent()) { throw new BookieException.CookieNotFoundException(bookieId.toString()); } @@ -250,7 +261,7 @@ public class PulsarRegistrationManager implements RegistrationManager { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new BookieException.MetadataStoreException(ie); - } catch (ExecutionException e) { + } catch (ExecutionException | TimeoutException e) { throw new BookieException.MetadataStoreException(e); } } @@ -259,7 +270,8 @@ public class PulsarRegistrationManager implements RegistrationManager { public void removeCookie(BookieId bookieId, Version version) throws BookieException { String path = this.cookiePath + "/" + bookieId; try { - store.delete(path, Optional.of(((LongVersion) version).getLongVersion())).get(); + store.delete(path, Optional.of(((LongVersion) version).getLongVersion())) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BookieException.MetadataStoreException("Interrupted deleting cookie for bookie " + bookieId, e); @@ -269,6 +281,8 @@ public class PulsarRegistrationManager implements RegistrationManager { } else { throw new BookieException.MetadataStoreException("Failed to delete cookie for bookie " + bookieId); } + } catch (TimeoutException ex) { + throw new BookieException.MetadataStoreException("Failed to delete cookie for bookie " + bookieId); } log.info("Removed cookie from {} for bookie {}.", cookiePath, bookieId); @@ -276,20 +290,23 @@ public class PulsarRegistrationManager implements RegistrationManager { @Override public boolean prepareFormat() throws Exception { - boolean ledgerRootExists = store.exists(ledgersRootPath).get(); - boolean availableNodeExists = store.exists(bookieRegistrationPath).get(); + boolean ledgerRootExists = store.exists(ledgersRootPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + boolean availableNodeExists = store.exists(bookieRegistrationPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); // Create ledgers root node if not exists if (!ledgerRootExists) { - store.put(ledgersRootPath, new byte[0], Optional.empty()).get(); + store.put(ledgersRootPath, new byte[0], Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } // create available bookies node if not exists if (!availableNodeExists) { - store.put(bookieRegistrationPath, new byte[0], Optional.empty()).get(); + store.put(bookieRegistrationPath, new byte[0], Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } // create readonly bookies node if not exists - if (!store.exists(bookieReadonlyRegistrationPath).get()) { - store.put(bookieReadonlyRegistrationPath, new byte[0], Optional.empty()).get(); + if (!store.exists(bookieReadonlyRegistrationPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { + store.put(bookieReadonlyRegistrationPath, new byte[0], Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } return ledgerRootExists; @@ -301,16 +318,18 @@ public class PulsarRegistrationManager implements RegistrationManager { log.info("Initializing metadata for new cluster, ledger root path: {}", ledgersRootPath); - if (store.exists(instanceIdPath).get()) { + if (store.exists(instanceIdPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { log.error("Ledger root path: {} already exists", ledgersRootPath); return false; } - store.put(ledgersRootPath, new byte[0], Optional.empty()).get(); + store.put(ledgersRootPath, new byte[0], Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); // create INSTANCEID String instanceId = UUID.randomUUID().toString(); - store.put(instanceIdPath, instanceId.getBytes(UTF_8), Optional.of(-1L)).join(); + store.put(instanceIdPath, instanceId.getBytes(UTF_8), Optional.of(-1L)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); log.info("Successfully initiated cluster. ledger root path: {} instanceId: {}", ledgersRootPath, instanceId); @@ -321,23 +340,28 @@ public class PulsarRegistrationManager implements RegistrationManager { public boolean format() throws Exception { // Clear underreplicated ledgers store.deleteRecursive(PulsarLedgerUnderreplicationManager.getBasePath(ledgersRootPath) - + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH).get(); + + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); // Clear underreplicatedledger locks - store.deleteRecursive(PulsarLedgerUnderreplicationManager.getUrLockPath(ledgersRootPath)).get(); + store.deleteRecursive(PulsarLedgerUnderreplicationManager.getUrLockPath(ledgersRootPath)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); // Clear the cookies - store.deleteRecursive(cookiePath).get(); + store.deleteRecursive(cookiePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); // Clear the INSTANCEID - if (store.exists(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID).get()) { - store.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, Optional.empty()).get(); + if (store.exists(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { + store.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, Optional.empty()) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } // create INSTANCEID String instanceId = UUID.randomUUID().toString(); store.put(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, - instanceId.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get(); + instanceId.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); log.info("Successfully formatted BookKeeper metadata"); return true; @@ -347,7 +371,7 @@ public class PulsarRegistrationManager implements RegistrationManager { public boolean nukeExistingCluster() throws Exception { log.info("Nuking metadata of existing cluster, ledger root path: {}", ledgersRootPath); - if (!store.exists(ledgersRootPath + "/" + INSTANCEID).join()) { + if (!store.exists(ledgersRootPath + "/" + INSTANCEID).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) { log.info("There is no existing cluster with ledgersRootPath: {}, so exiting nuke operation", ledgersRootPath); return true; @@ -356,17 +380,19 @@ public class PulsarRegistrationManager implements RegistrationManager { @Cleanup RegistrationClient registrationClient = new PulsarRegistrationClient(store, ledgersRootPath); - Collection<BookieId> rwBookies = registrationClient.getWritableBookies().join().getValue(); + Collection<BookieId> rwBookies = registrationClient.getWritableBookies() + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS).getValue(); if (rwBookies != null && !rwBookies.isEmpty()) { log.error("Bookies are still up and connected to this cluster, " - + "stop all bookies before nuking the cluster"); + + "stop all bookies before nuking the cluster"); return false; } - Collection<BookieId> roBookies = registrationClient.getReadOnlyBookies().join().getValue(); + Collection<BookieId> roBookies = registrationClient.getReadOnlyBookies() + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS).getValue(); if (roBookies != null && !roBookies.isEmpty()) { log.error("Readonly Bookies are still up and connected to this cluster, " - + "stop all bookies before nuking the cluster"); + + "stop all bookies before nuking the cluster"); return false; }
