This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new eddf3956318 [fix] [broker] Fix race-condition causing repeated delete 
topic (#23522)
eddf3956318 is described below

commit eddf395631811a731fe9c0284b44fd2f6efd2026
Author: fengyubiao <[email protected]>
AuthorDate: Tue Oct 29 16:01:57 2024 +0800

    [fix] [broker] Fix race-condition causing repeated delete topic (#23522)
    
    (cherry picked from commit 7b80f019fa86cf9e154e7dfcd3fd3dc1d036cbba)
---
 .../java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java |  9 ++++++---
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 +++++++++-
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 1bc2d2b04be..64e6f86d905 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -413,10 +413,13 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
     }
 
     private static MetaStoreException getException(Throwable t) {
-        if (t.getCause() instanceof 
MetadataStoreException.BadVersionException) {
-            return new 
ManagedLedgerException.BadVersionException(t.getMessage());
+        Throwable actEx = FutureUtil.unwrapCompletionException(t);
+        if (actEx instanceof MetadataStoreException.BadVersionException 
badVersionException) {
+            return new 
ManagedLedgerException.BadVersionException(badVersionException);
+        } else if (actEx instanceof MetaStoreException metaStoreException){
+            return metaStoreException;
         } else {
-            return new MetaStoreException(t);
+            return new MetaStoreException(actEx);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7faa86e5705..14a5e9f46b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1432,6 +1432,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             // Mark the progress of close to prevent close calling 
concurrently.
             this.closeFutures = new CloseFutures(new CompletableFuture(), new 
CompletableFuture());
 
+            AtomicBoolean alreadyUnFenced = new AtomicBoolean();
             CompletableFuture<Void> res = 
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
                         
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () 
-> {
                 CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
@@ -1448,6 +1449,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     closeClientFuture.complete(null);
                 }, getOrderedExecutor()).exceptionally(ex -> {
                     log.error("[{}] Error closing clients", topic, ex);
+                    alreadyUnFenced.set(true);
                     unfenceTopicToResume();
                     closeClientFuture.completeExceptionally(ex);
                     return null;
@@ -1470,6 +1472,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 .whenComplete((v, ex) -> {
                                     if (ex != null) {
                                         log.error("[{}] Error deleting topic", 
topic, ex);
+                                        alreadyUnFenced.set(true);
                                         unfenceTopicToResume();
                                         deleteFuture.completeExceptionally(ex);
                                     } else {
@@ -1479,6 +1482,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                     
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
                                         if (e != null) {
                                             log.error("[{}] Error deleting 
topic", topic, e);
+                                            alreadyUnFenced.set(true);
                                             unfenceTopicToResume();
                                             
deleteFuture.completeExceptionally(e);
                                         } else {
@@ -1509,6 +1513,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                     } else {
                                                         log.error("[{}] Error 
deleting topic",
                                                                 topic, 
exception);
+                                                        
alreadyUnFenced.set(true);
                                                         unfenceTopicToResume();
                                                         
deleteFuture.completeExceptionally(
                                                                 new 
PersistenceException(exception));
@@ -1521,6 +1526,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 }
                             });
                 }).exceptionally(ex->{
+                    alreadyUnFenced.set(true);
                     unfenceTopicToResume();
                     deleteFuture.completeExceptionally(
                             new TopicBusyException("Failed to close clients 
before deleting topic.",
@@ -1532,7 +1538,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 }).whenComplete((value, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Error deleting topic", topic, ex);
-                        unfenceTopicToResume();
+                        if (!alreadyUnFenced.get()) {
+                            unfenceTopicToResume();
+                        }
                     }
                 });
 

Reply via email to