This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d41077dc876 [fix][broker] Fix ownership loss (#23515)
d41077dc876 is described below
commit d41077dc8766840f8f398f73f4609b65d9001ce1
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Nov 7 11:49:28 2024 +0800
[fix][broker] Fix ownership loss (#23515)
Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit 576d34144c2dd44ed2eb0ce0b2babdf95ade874b)
---
.../apache/pulsar/broker/namespace/OwnershipCache.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 9a4534f5387..868ed2d9fc2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -170,7 +170,22 @@ public class OwnershipCache {
// If we're not the owner, we need to check if anybody else is
String path = ServiceUnitUtils.path(suName);
- return lockManager.readLock(path);
+ return lockManager.readLock(path).thenCompose(owner -> {
+ // If the current broker is the owner, attempt to reacquire
ownership to avoid cache loss.
+ if (owner.isPresent() && owner.get().equals(selfOwnerInfo)) {
+ log.warn("Detected ownership loss for broker [{}] on namespace
bundle [{}]. "
+ + "Attempting to reacquire ownership to
maintain cache consistency.",
+ selfOwnerInfo, suName);
+ try {
+ return
tryAcquiringOwnership(suName).thenApply(Optional::ofNullable);
+ } catch (Exception e) {
+ log.error("Failed to reacquire ownership for namespace
bundle [{}] on broker [{}]: {}",
+ suName, selfOwnerInfo, e.getMessage(), e);
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+ return CompletableFuture.completedFuture(owner);
+ });
}
/**