This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d41077dc876 [fix][broker] Fix ownership loss (#23515)
d41077dc876 is described below

commit d41077dc8766840f8f398f73f4609b65d9001ce1
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Nov 7 11:49:28 2024 +0800

    [fix][broker] Fix ownership loss (#23515)
    
    Signed-off-by: Zixuan Liu <[email protected]>
    (cherry picked from commit 576d34144c2dd44ed2eb0ce0b2babdf95ade874b)
---
 .../apache/pulsar/broker/namespace/OwnershipCache.java  | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 9a4534f5387..868ed2d9fc2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -170,7 +170,22 @@ public class OwnershipCache {
 
         // If we're not the owner, we need to check if anybody else is
         String path = ServiceUnitUtils.path(suName);
-        return lockManager.readLock(path);
+        return lockManager.readLock(path).thenCompose(owner -> {
+            // If the current broker is the owner, attempt to reacquire 
ownership to avoid cache loss.
+            if (owner.isPresent() && owner.get().equals(selfOwnerInfo)) {
+                log.warn("Detected ownership loss for broker [{}] on namespace 
bundle [{}]. "
+                                + "Attempting to reacquire ownership to 
maintain cache consistency.",
+                        selfOwnerInfo, suName);
+                try {
+                    return 
tryAcquiringOwnership(suName).thenApply(Optional::ofNullable);
+                } catch (Exception e) {
+                    log.error("Failed to reacquire ownership for namespace 
bundle [{}] on broker [{}]: {}",
+                            suName, selfOwnerInfo, e.getMessage(), e);
+                    return CompletableFuture.failedFuture(e);
+                }
+            }
+            return CompletableFuture.completedFuture(owner);
+        });
     }
 
     /**

Reply via email to