This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 976a58061fd [improve][meta] Improve fault tolerance of blocking calls
by supporting timeout (#21028)
976a58061fd is described below
commit 976a58061fd87d577b7903622ed2e61f4bec7d22
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Aug 21 09:42:05 2023 +0800
[improve][meta] Improve fault tolerance of blocking calls by supporting
timeout (#21028)
---
.../bookkeeper/AbstractMetadataDriver.java | 2 +
.../LegacyHierarchicalLedgerRangeIterator.java | 18 ++-
.../LongHierarchicalLedgerRangeIterator.java | 7 +-
.../metadata/bookkeeper/PulsarLayoutManager.java | 18 ++-
.../bookkeeper/PulsarLedgerManagerFactory.java | 38 ++++++-
.../PulsarLedgerUnderreplicationManager.java | 122 +++++++++++++--------
.../bookkeeper/PulsarRegistrationManager.java | 112 +++++++++++--------
7 files changed, 209 insertions(+), 108 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
index cc5f759c73f..435f94b05dc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.metadata.bookkeeper;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
+import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
@@ -40,6 +41,7 @@ public abstract class AbstractMetadataDriver implements
Closeable {
public static final String METADATA_STORE_SCHEME = "metadata-store";
public static final String METADATA_STORE_INSTANCE =
"metadata-store-instance";
+ public static final long BLOCKING_CALL_TIMEOUT =
TimeUnit.SECONDS.toMillis(30);
protected MetadataStoreExtended store;
private boolean storeInstanceIsOwned;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
index 15b1d561f90..37e6dc836f2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
@@ -18,17 +18,21 @@
*/
package org.apache.pulsar.metadata.bookkeeper;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.metadata.api.MetadataStore;
+
/**
* Hierarchical Ledger Manager which manages ledger meta in zookeeper using
2-level hierarchical znodes.
*
@@ -67,7 +71,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements
LedgerManager.Ledg
* @return false if have visited all level1 nodes
* @throws InterruptedException/KeeperException if error occurs reading
zookeeper children
*/
- private boolean nextL1Node() throws ExecutionException,
InterruptedException {
+ private boolean nextL1Node() throws ExecutionException,
InterruptedException, TimeoutException {
l2NodesIter = null;
while (l2NodesIter == null) {
if (l1NodesIter.hasNext()) {
@@ -79,7 +83,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements
LedgerManager.Ledg
if (!isLedgerParentNode(curL1Nodes)) {
continue;
}
- List<String> l2Nodes = store.getChildren(ledgersRoot + "/" +
curL1Nodes).get();
+ List<String> l2Nodes = store.getChildren(ledgersRoot + "/" +
curL1Nodes)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
l2NodesIter = l2Nodes.iterator();
if (!l2NodesIter.hasNext()) {
l2NodesIter = null;
@@ -94,7 +99,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements
LedgerManager.Ledg
boolean hasMoreElements = false;
try {
if (l1NodesIter == null) {
- List<String> l1Nodes =
store.getChildren(ledgersRoot).get();
+ List<String> l1Nodes = store.getChildren(ledgersRoot)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
l1NodesIter = l1Nodes.iterator();
hasMoreElements = nextL1Node();
} else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
@@ -102,7 +108,7 @@ public class LegacyHierarchicalLedgerRangeIterator
implements LedgerManager.Ledg
} else {
hasMoreElements = true;
}
- } catch (ExecutionException ke) {
+ } catch (ExecutionException | TimeoutException ke) {
throw new IOException("Error preloading next range", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -156,8 +162,8 @@ public class LegacyHierarchicalLedgerRangeIterator
implements LedgerManager.Ledg
String nodePath = nodeBuilder.toString();
List<String> ledgerNodes = null;
try {
- ledgerNodes = store.getChildren(nodePath).get();
- } catch (ExecutionException e) {
+ ledgerNodes =
store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ } catch (ExecutionException | TimeoutException e) {
throw new IOException("Error when get child nodes from zk", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
index 9a36ac53b89..3b32916e6e7 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.StringUtils;
@@ -57,8 +59,9 @@ class LongHierarchicalLedgerRangeIterator implements
LedgerManager.LedgerRangeIt
*/
List<String> getChildrenAt(String path) throws IOException {
try {
- return store.getChildren(path).get();
- } catch (ExecutionException e) {
+ return store.getChildren(path)
+ .get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | TimeoutException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to get children at {}", path);
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
index a4336b87639..4444295b616 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.metadata.bookkeeper;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.bookkeeper.bookie.BookieException;
@@ -30,6 +33,7 @@ import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
class PulsarLayoutManager implements LayoutManager {
@Getter(AccessLevel.PACKAGE)
@@ -49,14 +53,14 @@ class PulsarLayoutManager implements LayoutManager {
@Override
public LedgerLayout readLedgerLayout() throws IOException {
try {
- byte[] layoutData = store.get(layoutPath).get()
+ byte[] layoutData =
store.get(layoutPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
.orElseThrow(() -> new
BookieException.MetadataStoreException("Layout node not found"))
.getValue();
return LedgerLayout.parseLayout(layoutData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
- } catch (BookieException | ExecutionException e) {
+ } catch (BookieException | ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}
@@ -66,10 +70,13 @@ class PulsarLayoutManager implements LayoutManager {
try {
byte[] layoutData = ledgerLayout.serialize();
- store.put(layoutPath, layoutData, Optional.of(-1L)).get();
+ store.put(layoutPath, layoutData, Optional.of(-1L))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
+ } catch (TimeoutException e) {
+ throw new IOException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof
MetadataStoreException.BadVersionException) {
throw new LedgerLayoutExistsException(e);
@@ -82,11 +89,12 @@ class PulsarLayoutManager implements LayoutManager {
@Override
public void deleteLedgerLayout() throws IOException {
try {
- store.delete(layoutPath, Optional.empty()).get();
+ store.delete(layoutPath, Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
- } catch (ExecutionException e) {
+ } catch (ExecutionException | TimeoutException e) {
throw new IOException(e);
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
index 1b229757c9c..bfcbf0b22d9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
@@ -19,8 +19,12 @@
package org.apache.pulsar.metadata.bookkeeper;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -110,7 +114,13 @@ public class PulsarLedgerManagerFactory implements
LedgerManagerFactory {
* before proceeding with nuking existing cluster, make sure there
* are no unexpected nodes under ledgersRootPath
*/
- List<String> ledgersRootPathChildrenList =
store.getChildren(ledgerRootPath).join();
+ final List<String> ledgersRootPathChildrenList;
+ try {
+ ledgersRootPathChildrenList = store.getChildren(ledgerRootPath)
+ .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new IOException(e);
+ }
for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
if
((!AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren))
&&
(!ledgerManager.isLedgerParentNode(ledgersRootPathChildren))) {
@@ -124,18 +134,34 @@ public class PulsarLedgerManagerFactory implements
LedgerManagerFactory {
format(conf, layoutManager);
// now delete all the special nodes recursively
- for (String ledgersRootPathChildren :
store.getChildren(ledgerRootPath).join()) {
- if
(AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
- store.deleteRecursive(ledgerRootPath + "/" +
ledgersRootPathChildren).join();
+ final List<String> ledgersRootPathChildren;
+ try {
+ ledgersRootPathChildren = store.getChildren(ledgerRootPath)
+ .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new IOException(e);
+ }
+ for (String ledgersRootPathChild :ledgersRootPathChildren) {
+ if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChild)) {
+ try {
+ store.deleteRecursive(ledgerRootPath + "/" +
ledgersRootPathChild)
+ .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new IOException(e);
+ }
} else {
log.error("Found unexpected node : {} under ledgersRootPath :
{} so exiting nuke operation",
- ledgersRootPathChildren, ledgerRootPath);
+ ledgersRootPathChild, ledgerRootPath);
return false;
}
}
// finally deleting the ledgers rootpath
- store.deleteRecursive(ledgerRootPath).join();
+ try {
+ store.deleteRecursive(ledgerRootPath).get(BLOCKING_CALL_TIMEOUT,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new IOException(e);
+ }
log.info("Successfully nuked existing cluster");
return true;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
index 79fdc44cb2b..1124090a98d 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
@@ -19,12 +19,14 @@
package org.apache.pulsar.metadata.bookkeeper;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat;
import static
org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
import static org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
import static
org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
import static
org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
+import static
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import java.net.UnknownHostException;
@@ -41,6 +43,7 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -249,7 +252,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
try {
String path = getUrLedgerPath(ledgerId);
- Optional<GetResult> optRes = store.get(path).get();
+ Optional<GetResult> optRes =
store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (!optRes.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Ledger: {} is not marked underreplicated",
ledgerId);
@@ -270,7 +273,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
underreplicatedLedger.setCtime(ctime);
underreplicatedLedger.setReplicaList(replicaList);
return underreplicatedLedger;
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
throw new ReplicationException.UnavailableException("Error
contacting with metadata store", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -374,14 +377,16 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
public void acquireUnderreplicatedLedger(long ledgerId) throws
ReplicationException {
try {
internalAcquireUnderreplicatedLedger(ledgerId);
- } catch (ExecutionException | InterruptedException e) {
+ } catch (ExecutionException | TimeoutException | InterruptedException
e) {
throw new ReplicationException.UnavailableException("Failed to
acuire under-replicated ledger", e);
}
}
- private void internalAcquireUnderreplicatedLedger(long ledgerId) throws
ExecutionException, InterruptedException {
+ private void internalAcquireUnderreplicatedLedger(long ledgerId) throws
ExecutionException,
+ InterruptedException, TimeoutException {
String lockPath = getUrLedgerLockPath(urLockPath, ledgerId);
- store.put(lockPath, LOCK_DATA, Optional.of(-1L),
EnumSet.of(CreateOption.Ephemeral)).get();
+ store.put(lockPath, LOCK_DATA, Optional.of(-1L),
EnumSet.of(CreateOption.Ephemeral))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
@Override
@@ -392,7 +397,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
try {
Lock l = heldLocks.get(ledgerId);
if (l != null) {
- store.delete(getUrLedgerPath(ledgerId),
Optional.of(l.getLedgerNodeVersion())).get();
+ store.delete(getUrLedgerPath(ledgerId),
Optional.of(l.getLedgerNodeVersion()))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
} catch (ExecutionException ee) {
if (ee.getCause() instanceof
MetadataStoreException.NotFoundException) {
@@ -405,6 +411,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.error("Error deleting underreplicated ledger node", ee);
throw new ReplicationException.UnavailableException("Error
contacting metadata store", ee);
}
+ } catch (TimeoutException ex) {
+ throw new ReplicationException.UnavailableException("Error
contacting metadata store", ex);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted
while contacting metadata store", ie);
@@ -445,7 +453,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
while (queue.size() > 0 && curBatch.size() == 0) {
String parent = queue.remove();
try {
- for (String c : store.getChildren(parent).get()) {
+ for (String c :
store.getChildren(parent).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
String child = parent + "/" + c;
if (c.startsWith("urL")) {
long ledgerId = getLedgerId(child);
@@ -479,21 +487,23 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
}
private long getLedgerToRereplicateFromHierarchy(String parent, long depth)
- throws ExecutionException, InterruptedException {
+ throws ExecutionException, InterruptedException, TimeoutException {
if (depth == 4) {
- List<String> children = new
ArrayList<>(store.getChildren(parent).get());
+ List<String> children = new ArrayList<>(store.getChildren(parent)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
Collections.shuffle(children);
while (!children.isEmpty()) {
String tryChild = children.get(0);
try {
- List<String> locks = store.getChildren(urLockPath).get();
+ List<String> locks =
store.getChildren(urLockPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (locks.contains(tryChild)) {
children.remove(tryChild);
continue;
}
- Optional<GetResult> optRes = store.get(parent + "/" +
tryChild).get();
+ Optional<GetResult> optRes = store.get(parent + "/" +
tryChild)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (!optRes.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("{}/{} doesn't exist", parent, tryChild);
@@ -522,7 +532,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
return -1;
}
- List<String> children = new
ArrayList<>(store.getChildren(parent).join());
+ List<String> children = new
ArrayList<>(store.getChildren(parent).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
Collections.shuffle(children);
while (children.size() > 0) {
@@ -545,7 +555,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
}
try {
return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
throw new ReplicationException.UnavailableException("Error
contacting metadata store", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -571,7 +581,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
// nothing found, wait for a watcher to trigger
this.wait(1000);
}
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
throw new ReplicationException.UnavailableException("Error
contacting metadata store", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -597,7 +607,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
try {
Lock l = heldLocks.get(ledgerId);
if (l != null) {
- store.delete(l.getLockPath(), Optional.empty()).get();
+ store.delete(l.getLockPath(), Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
} catch (ExecutionException ee) {
if (ee.getCause() instanceof
MetadataStoreException.NotFoundException) {
@@ -606,6 +617,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.error("Error deleting underreplicated ledger lock", ee);
throw new ReplicationException.UnavailableException("Error
contacting metadata store", ee);
}
+ } catch (TimeoutException ex) {
+ throw new ReplicationException.UnavailableException("Error
contacting metadata store", ex);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted
while connecting metadata store", ie);
@@ -620,7 +633,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
}
try {
for (Map.Entry<Long, Lock> e : heldLocks.entrySet()) {
- store.delete(e.getValue().getLockPath(),
Optional.empty()).get();
+ store.delete(e.getValue().getLockPath(), Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
} catch (ExecutionException ee) {
if (ee.getCause() instanceof
MetadataStoreException.NotFoundException) {
@@ -629,6 +643,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.error("Error deleting underreplicated ledger lock", ee);
throw new ReplicationException.UnavailableException("Error
contacting metadata store", ee);
}
+ } catch (TimeoutException ex) {
+ throw new ReplicationException.UnavailableException("Error
contacting metadata store", ex);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted
while connecting metadata store", ie);
@@ -643,9 +659,10 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
}
try {
String path = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
- store.put(path, "".getBytes(UTF_8), Optional.of(-1L)).get();
+ store.put(path, "".getBytes(UTF_8), Optional.of(-1L))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Auto ledger re-replication is disabled!");
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Exception while stopping auto ledger re-replication",
ee);
throw new ReplicationException.UnavailableException(
"Exception while stopping auto ledger re-replication", ee);
@@ -663,9 +680,10 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
log.debug("enableLedegerReplication()");
}
try {
- store.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE,
Optional.empty()).get();
+ store.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE,
Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Resuming automatic ledger re-replication");
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Exception while resuming ledger replication", ee);
throw new ReplicationException.UnavailableException(
"Exception while resuming auto ledger re-replication", ee);
@@ -683,8 +701,9 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.debug("isLedgerReplicationEnabled()");
}
try {
- return !store.exists(basePath + '/' +
BookKeeperConstants.DISABLE_NODE).get();
- } catch (ExecutionException ee) {
+ return !store.exists(basePath + '/' +
BookKeeperConstants.DISABLE_NODE)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Error while checking the state of "
+ "ledger re-replication", ee);
throw new ReplicationException.UnavailableException(
@@ -708,13 +727,14 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
}
try {
- if (!store.exists(basePath + '/' +
BookKeeperConstants.DISABLE_NODE).get()) {
+ if (!store.exists(basePath + '/' +
BookKeeperConstants.DISABLE_NODE)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
log.info("LedgerReplication is enabled externally through
metadata store, "
+ "since DISABLE_NODE node is deleted");
cb.operationComplete(0, null);
return;
}
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Error while checking the state of "
+ "ledger re-replication", ee);
throw new ReplicationException.UnavailableException(
@@ -732,7 +752,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
@Override
public boolean isLedgerBeingReplicated(long ledgerId) throws
ReplicationException {
try {
- return store.exists(getUrLedgerLockPath(urLockPath,
ledgerId)).get();
+ return store.exists(getUrLedgerLockPath(urLockPath,
ledgerId)).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (Exception e) {
throw new ReplicationException.UnavailableException("Failed to
check if ledger is beinge replicated", e);
}
@@ -744,7 +764,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.debug("initializeLostBookieRecoveryDelay()");
try {
store.put(lostBookieRecoveryDelayPath,
Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
- Optional.of(-1L)).get();
+ Optional.of(-1L)).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (ExecutionException ee) {
if (ee.getCause() instanceof
MetadataStoreException.BadVersionException) {
log.info("lostBookieRecoveryDelay node is already present, so
using "
@@ -754,6 +774,9 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.error("Error while initializing LostBookieRecoveryDelay",
ee);
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
}
+ } catch (TimeoutException ex) {
+ log.error("Error while initializing LostBookieRecoveryDelay", ex);
+ throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ex);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted
while contacting zookeeper", ie);
@@ -767,9 +790,9 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.debug("setLostBookieRecoveryDelay()");
try {
store.put(lostBookieRecoveryDelayPath,
Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
- Optional.empty()).get();
+ Optional.empty()).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Error while setting LostBookieRecoveryDelay ", ee);
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
} catch (InterruptedException ie) {
@@ -782,9 +805,10 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
public int getLostBookieRecoveryDelay() throws
ReplicationException.UnavailableException {
log.debug("getLostBookieRecoveryDelay()");
try {
- byte[] data =
store.get(lostBookieRecoveryDelayPath).get().get().getValue();
+ byte[] data =
store.get(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
+ .get().getValue();
return Integer.parseInt(new String(data, UTF_8));
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Error while getting LostBookieRecoveryDelay ", ee);
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
} catch (InterruptedException ie) {
@@ -801,12 +825,12 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
lostBookieRecoveryDelayListener = cb;
}
try {
- if (!store.exists(lostBookieRecoveryDelayPath).get()) {
+ if
(!store.exists(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS)) {
cb.operationComplete(0, null);
return;
}
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Error while checking the state of
lostBookieRecoveryDelay", ee);
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
} catch (InterruptedException ie) {
@@ -820,7 +844,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
throws ReplicationException.UnavailableException {
try {
- Optional<GetResult> optRes =
store.get(getUrLedgerLockPath(urLockPath, ledgerId)).get();
+ Optional<GetResult> optRes =
store.get(getUrLedgerLockPath(urLockPath, ledgerId))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (!optRes.isPresent()) {
// this is ok.
return null;
@@ -831,7 +856,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
TextFormat.merge(new String(lockData, UTF_8), lockDataBuilder);
LockDataFormat lock = lockDataBuilder.build();
return lock.getBookieId();
- } catch (ExecutionException e) {
+ } catch (ExecutionException | TimeoutException e) {
log.error("Error while getting ReplicationWorkerId rereplicating
Ledger", e);
throw new ReplicationException.UnavailableException(
"Error while getting ReplicationWorkerId rereplicating
Ledger", e);
@@ -855,8 +880,9 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
builder.setCheckAllLedgersCTime(checkAllLedgersCTime);
byte[] checkAllLedgersFormatByteArray =
builder.build().toByteArray();
- store.put(checkAllLedgersCtimePath,
checkAllLedgersFormatByteArray, Optional.empty()).get();
- } catch (ExecutionException ee) {
+ store.put(checkAllLedgersCtimePath,
checkAllLedgersFormatByteArray, Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ } catch (ExecutionException | TimeoutException ee) {
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -870,7 +896,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.debug("setCheckAllLedgersCTime");
}
try {
- Optional<GetResult> optRes =
store.get(checkAllLedgersCtimePath).get();
+ Optional<GetResult> optRes =
store.get(checkAllLedgersCtimePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (!optRes.isPresent()) {
log.warn("checkAllLedgersCtimeZnode is not yet available");
return -1;
@@ -879,7 +905,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
CheckAllLedgersFormat checkAllLedgersFormat =
CheckAllLedgersFormat.parseFrom(data);
return checkAllLedgersFormat.hasCheckAllLedgersCTime() ?
checkAllLedgersFormat.getCheckAllLedgersCTime()
: -1;
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -899,8 +925,9 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
PlacementPolicyCheckFormat.Builder builder =
PlacementPolicyCheckFormat.newBuilder();
builder.setPlacementPolicyCheckCTime(placementPolicyCheckCTime);
byte[] placementPolicyCheckFormatByteArray =
builder.build().toByteArray();
- store.put(placementPolicyCheckCtimePath,
placementPolicyCheckFormatByteArray, Optional.empty()).get();
- } catch (ExecutionException ke) {
+ store.put(placementPolicyCheckCtimePath,
placementPolicyCheckFormatByteArray, Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ } catch (ExecutionException | TimeoutException ke) {
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -914,7 +941,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.debug("getPlacementPolicyCheckCTime");
}
try {
- Optional<GetResult> optRes =
store.get(placementPolicyCheckCtimePath).get();
+ Optional<GetResult> optRes =
store.get(placementPolicyCheckCtimePath)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (!optRes.isPresent()) {
log.warn("placementPolicyCheckCtimeZnode is not yet
available");
return -1;
@@ -923,7 +951,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
PlacementPolicyCheckFormat placementPolicyCheckFormat =
PlacementPolicyCheckFormat.parseFrom(data);
return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime()
?
placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1;
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -939,11 +967,12 @@ public class PulsarLedgerUnderreplicationManager
implements LedgerUnderreplicati
ReplicasCheckFormat.Builder builder =
ReplicasCheckFormat.newBuilder();
builder.setReplicasCheckCTime(replicasCheckCTime);
byte[] replicasCheckFormatByteArray =
builder.build().toByteArray();
- store.put(replicasCheckCtimePath, replicasCheckFormatByteArray,
Optional.empty()).get();
+ store.put(replicasCheckCtimePath, replicasCheckFormatByteArray,
Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (log.isDebugEnabled()) {
log.debug("setReplicasCheckCTime completed successfully");
}
- } catch (ExecutionException ke) {
+ } catch (ExecutionException | TimeoutException ke) {
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -954,7 +983,8 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
@Override
public long getReplicasCheckCTime() throws
ReplicationException.UnavailableException {
try {
- Optional<GetResult> optRes =
store.get(replicasCheckCtimePath).get();
+ Optional<GetResult> optRes = store.get(replicasCheckCtimePath)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (!optRes.isPresent()) {
log.warn("placementPolicyCheckCtimeZnode is not yet
available");
return -1;
@@ -965,7 +995,7 @@ public class PulsarLedgerUnderreplicationManager implements
LedgerUnderreplicati
log.debug("getReplicasCheckCTime completed successfully");
}
return replicasCheckFormat.hasReplicasCheckCTime() ?
replicasCheckFormat.getReplicasCheckCTime() : -1;
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
throw new ReplicationException.UnavailableException("Error
contacting zookeeper", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
index 25c3f10aa18..c6aba6b7d93 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
@@ -19,10 +19,12 @@
package org.apache.pulsar.metadata.bookkeeper;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import static
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,8 +34,8 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -85,12 +87,11 @@ public class PulsarRegistrationManager implements
RegistrationManager {
}
@Override
- @SneakyThrows
public void close() {
for (ResourceLock<BookieServiceInfo> rwBookie :
bookieRegistration.values()) {
try {
- rwBookie.release().get();
- } catch (ExecutionException ignore) {
+ rwBookie.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ } catch (ExecutionException | TimeoutException ignore) {
log.error("Cannot release correctly {}", rwBookie,
ignore.getCause());
} catch (InterruptedException ignore) {
log.error("Cannot release correctly {}", rwBookie, ignore);
@@ -100,26 +101,30 @@ public class PulsarRegistrationManager implements
RegistrationManager {
for (ResourceLock<BookieServiceInfo> roBookie :
bookieRegistrationReadOnly.values()) {
try {
- roBookie.release().get();
- } catch (ExecutionException ignore) {
+ roBookie.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ } catch (ExecutionException | TimeoutException ignore) {
log.error("Cannot release correctly {}", roBookie,
ignore.getCause());
} catch (InterruptedException ignore) {
log.error("Cannot release correctly {}", roBookie, ignore);
Thread.currentThread().interrupt();
}
}
- coordinationService.close();
+ try {
+ coordinationService.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public String getClusterInstanceId() throws BookieException {
try {
return store.get(ledgersRootPath + "/" + INSTANCEID)
- .get()
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
.map(res -> new String(res.getValue(), UTF_8))
.orElseThrow(
() -> new
BookieException.MetadataStoreException("BookKeeper cluster not initialized"));
- } catch (ExecutionException | InterruptedException e) {
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
throw new BookieException.MetadataStoreException("Failed to get
cluster instance id", e);
}
}
@@ -136,22 +141,24 @@ public class PulsarRegistrationManager implements
RegistrationManager {
ResourceLock<BookieServiceInfo> rwRegistration =
bookieRegistration.remove(bookieId);
if (rwRegistration != null) {
log.info("Bookie {} was already registered as writable,
unregistering", bookieId);
- rwRegistration.release().get();
+ rwRegistration.release().get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS);
}
bookieRegistrationReadOnly.put(bookieId,
- lockManager.acquireLock(regPathReadOnly,
bookieServiceInfo).get());
+ lockManager.acquireLock(regPathReadOnly,
bookieServiceInfo)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
} else {
ResourceLock<BookieServiceInfo> roRegistration =
bookieRegistrationReadOnly.remove(bookieId);
if (roRegistration != null) {
log.info("Bookie {} was already registered as read-only,
unregistering", bookieId);
- roRegistration.release().get();
+ roRegistration.release().get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS);
}
bookieRegistration.put(bookieId,
- lockManager.acquireLock(regPath,
bookieServiceInfo).get());
+ lockManager.acquireLock(regPath, bookieServiceInfo)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
}
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Exception registering ephemeral node for Bookie!", ee);
// Throw an IOException back up. This will cause the Bookie
// constructor to error out. Alternatively, we could do a System
@@ -173,18 +180,18 @@ public class PulsarRegistrationManager implements
RegistrationManager {
if (readOnly) {
ResourceLock<BookieServiceInfo> roRegistration =
bookieRegistrationReadOnly.get(bookieId);
if (roRegistration != null) {
- roRegistration.release().get();
+ roRegistration.release().get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS);
}
} else {
ResourceLock<BookieServiceInfo> rwRegistration =
bookieRegistration.get(bookieId);
if (rwRegistration != null) {
- rwRegistration.release().get();
+ rwRegistration.release().get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS);
}
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new BookieException.MetadataStoreException(ie);
- } catch (ExecutionException e) {
+ } catch (ExecutionException | TimeoutException e) {
throw new BookieException.MetadataStoreException(e);
}
}
@@ -195,8 +202,9 @@ public class PulsarRegistrationManager implements
RegistrationManager {
String readonlyRegPath = bookieReadonlyRegistrationPath + "/" +
bookieId;
try {
- return (store.exists(regPath).get() ||
store.exists(readonlyRegPath).get());
- } catch (ExecutionException e) {
+ return (store.exists(regPath).get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS)
+ ||
store.exists(readonlyRegPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
+ } catch (ExecutionException | TimeoutException e) {
log.error("Exception while checking registration ephemeral nodes
for BookieId: {}", bookieId, e);
throw new BookieException.MetadataStoreException(e);
} catch (InterruptedException e) {
@@ -222,7 +230,8 @@ public class PulsarRegistrationManager implements
RegistrationManager {
version = ((LongVersion)
cookieData.getVersion()).getLongVersion();
}
- store.put(path, cookieData.getValue(), Optional.of(version)).get();
+ store.put(path, cookieData.getValue(), Optional.of(version))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new BookieException.MetadataStoreException("Interrupted
writing cookie for bookie " + bookieId, ie);
@@ -232,6 +241,8 @@ public class PulsarRegistrationManager implements
RegistrationManager {
} else {
throw new BookieException.MetadataStoreException("Failed to
write cookie for bookie " + bookieId);
}
+ } catch (TimeoutException ex) {
+ throw new BookieException.MetadataStoreException("Failed to write
cookie for bookie " + bookieId, ex);
}
}
@@ -239,7 +250,7 @@ public class PulsarRegistrationManager implements
RegistrationManager {
public Versioned<byte[]> readCookie(BookieId bookieId) throws
BookieException {
String path = this.cookiePath + "/" + bookieId;
try {
- Optional<GetResult> res = store.get(path).get();
+ Optional<GetResult> res =
store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
if (!res.isPresent()) {
throw new
BookieException.CookieNotFoundException(bookieId.toString());
}
@@ -250,7 +261,7 @@ public class PulsarRegistrationManager implements
RegistrationManager {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new BookieException.MetadataStoreException(ie);
- } catch (ExecutionException e) {
+ } catch (ExecutionException | TimeoutException e) {
throw new BookieException.MetadataStoreException(e);
}
}
@@ -259,7 +270,8 @@ public class PulsarRegistrationManager implements
RegistrationManager {
public void removeCookie(BookieId bookieId, Version version) throws
BookieException {
String path = this.cookiePath + "/" + bookieId;
try {
- store.delete(path, Optional.of(((LongVersion)
version).getLongVersion())).get();
+ store.delete(path, Optional.of(((LongVersion)
version).getLongVersion()))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BookieException.MetadataStoreException("Interrupted
deleting cookie for bookie " + bookieId, e);
@@ -269,6 +281,8 @@ public class PulsarRegistrationManager implements
RegistrationManager {
} else {
throw new BookieException.MetadataStoreException("Failed to
delete cookie for bookie " + bookieId);
}
+ } catch (TimeoutException ex) {
+ throw new BookieException.MetadataStoreException("Failed to delete
cookie for bookie " + bookieId);
}
log.info("Removed cookie from {} for bookie {}.", cookiePath,
bookieId);
@@ -276,20 +290,23 @@ public class PulsarRegistrationManager implements
RegistrationManager {
@Override
public boolean prepareFormat() throws Exception {
- boolean ledgerRootExists = store.exists(ledgersRootPath).get();
- boolean availableNodeExists =
store.exists(bookieRegistrationPath).get();
+ boolean ledgerRootExists =
store.exists(ledgersRootPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ boolean availableNodeExists =
store.exists(bookieRegistrationPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
// Create ledgers root node if not exists
if (!ledgerRootExists) {
- store.put(ledgersRootPath, new byte[0], Optional.empty()).get();
+ store.put(ledgersRootPath, new byte[0], Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
// create available bookies node if not exists
if (!availableNodeExists) {
- store.put(bookieRegistrationPath, new byte[0],
Optional.empty()).get();
+ store.put(bookieRegistrationPath, new byte[0], Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
// create readonly bookies node if not exists
- if (!store.exists(bookieReadonlyRegistrationPath).get()) {
- store.put(bookieReadonlyRegistrationPath, new byte[0],
Optional.empty()).get();
+ if
(!store.exists(bookieReadonlyRegistrationPath).get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS)) {
+ store.put(bookieReadonlyRegistrationPath, new byte[0],
Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
return ledgerRootExists;
@@ -301,16 +318,18 @@ public class PulsarRegistrationManager implements
RegistrationManager {
log.info("Initializing metadata for new cluster, ledger root path: {}",
ledgersRootPath);
- if (store.exists(instanceIdPath).get()) {
+ if (store.exists(instanceIdPath).get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS)) {
log.error("Ledger root path: {} already exists", ledgersRootPath);
return false;
}
- store.put(ledgersRootPath, new byte[0], Optional.empty()).get();
+ store.put(ledgersRootPath, new byte[0], Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
// create INSTANCEID
String instanceId = UUID.randomUUID().toString();
- store.put(instanceIdPath, instanceId.getBytes(UTF_8),
Optional.of(-1L)).join();
+ store.put(instanceIdPath, instanceId.getBytes(UTF_8), Optional.of(-1L))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Successfully initiated cluster. ledger root path: {}
instanceId: {}",
ledgersRootPath, instanceId);
@@ -321,23 +340,28 @@ public class PulsarRegistrationManager implements
RegistrationManager {
public boolean format() throws Exception {
// Clear underreplicated ledgers
store.deleteRecursive(PulsarLedgerUnderreplicationManager.getBasePath(ledgersRootPath)
- + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH).get();
+ +
BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
// Clear underreplicatedledger locks
-
store.deleteRecursive(PulsarLedgerUnderreplicationManager.getUrLockPath(ledgersRootPath)).get();
+
store.deleteRecursive(PulsarLedgerUnderreplicationManager.getUrLockPath(ledgersRootPath))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
// Clear the cookies
- store.deleteRecursive(cookiePath).get();
+ store.deleteRecursive(cookiePath).get(BLOCKING_CALL_TIMEOUT,
MILLISECONDS);
// Clear the INSTANCEID
- if (store.exists(ledgersRootPath + "/" +
BookKeeperConstants.INSTANCEID).get()) {
- store.delete(ledgersRootPath + "/" +
BookKeeperConstants.INSTANCEID, Optional.empty()).get();
+ if (store.exists(ledgersRootPath + "/" +
BookKeeperConstants.INSTANCEID)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
+ store.delete(ledgersRootPath + "/" +
BookKeeperConstants.INSTANCEID, Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
}
// create INSTANCEID
String instanceId = UUID.randomUUID().toString();
store.put(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID,
- instanceId.getBytes(StandardCharsets.UTF_8),
Optional.of(-1L)).get();
+ instanceId.getBytes(StandardCharsets.UTF_8), Optional.of(-1L))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Successfully formatted BookKeeper metadata");
return true;
@@ -347,7 +371,7 @@ public class PulsarRegistrationManager implements
RegistrationManager {
public boolean nukeExistingCluster() throws Exception {
log.info("Nuking metadata of existing cluster, ledger root path: {}",
ledgersRootPath);
- if (!store.exists(ledgersRootPath + "/" + INSTANCEID).join()) {
+ if (!store.exists(ledgersRootPath + "/" +
INSTANCEID).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
log.info("There is no existing cluster with ledgersRootPath: {},
so exiting nuke operation",
ledgersRootPath);
return true;
@@ -356,17 +380,19 @@ public class PulsarRegistrationManager implements
RegistrationManager {
@Cleanup
RegistrationClient registrationClient = new
PulsarRegistrationClient(store, ledgersRootPath);
- Collection<BookieId> rwBookies =
registrationClient.getWritableBookies().join().getValue();
+ Collection<BookieId> rwBookies =
registrationClient.getWritableBookies()
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS).getValue();
if (rwBookies != null && !rwBookies.isEmpty()) {
log.error("Bookies are still up and connected to this cluster, "
- + "stop all bookies before nuking the cluster");
+ + "stop all bookies before nuking the cluster");
return false;
}
- Collection<BookieId> roBookies =
registrationClient.getReadOnlyBookies().join().getValue();
+ Collection<BookieId> roBookies =
registrationClient.getReadOnlyBookies()
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS).getValue();
if (roBookies != null && !roBookies.isEmpty()) {
log.error("Readonly Bookies are still up and connected to this
cluster, "
- + "stop all bookies before nuking the cluster");
+ + "stop all bookies before nuking the cluster");
return false;
}