This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a12ae38ad98 [fix] [broker] Fix race-condition causing repeated delete 
topic (#23522)
a12ae38ad98 is described below

commit a12ae38ad983ae005f93be6977badae565f5e703
Author: fengyubiao <[email protected]>
AuthorDate: Tue Oct 29 16:01:57 2024 +0800

    [fix] [broker] Fix race-condition causing repeated delete topic (#23522)
    
    (cherry picked from commit 7b80f019fa86cf9e154e7dfcd3fd3dc1d036cbba)
---
 .../java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java |  9 ++++++---
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 +++++++++-
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index d9269ec83b1..e47443e4e8f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -398,10 +398,13 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
     }
 
     private static MetaStoreException getException(Throwable t) {
-        if (t.getCause() instanceof 
MetadataStoreException.BadVersionException) {
-            return new 
ManagedLedgerException.BadVersionException(t.getMessage());
+        Throwable actEx = FutureUtil.unwrapCompletionException(t);
+        if (actEx instanceof MetadataStoreException.BadVersionException 
badVersionException) {
+            return new 
ManagedLedgerException.BadVersionException(badVersionException);
+        } else if (actEx instanceof MetaStoreException metaStoreException){
+            return metaStoreException;
         } else {
-            return new MetaStoreException(t);
+            return new MetaStoreException(actEx);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5b6d551ac8b..92194af6b36 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1455,6 +1455,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             this.closeFutures =
                     new CloseFutures(new CompletableFuture(), new 
CompletableFuture(), new CompletableFuture());
 
+            AtomicBoolean alreadyUnFenced = new AtomicBoolean();
             CompletableFuture<Void> res = 
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
                         
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () 
-> {
                 CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
@@ -1471,6 +1472,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     closeClientFuture.complete(null);
                 }, getOrderedExecutor()).exceptionally(ex -> {
                     log.error("[{}] Error closing clients", topic, ex);
+                    alreadyUnFenced.set(true);
                     unfenceTopicToResume();
                     closeClientFuture.completeExceptionally(ex);
                     return null;
@@ -1493,6 +1495,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 .whenComplete((v, ex) -> {
                                     if (ex != null) {
                                         log.error("[{}] Error deleting topic", 
topic, ex);
+                                        alreadyUnFenced.set(true);
                                         unfenceTopicToResume();
                                         deleteFuture.completeExceptionally(ex);
                                     } else {
@@ -1502,6 +1505,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                     
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
                                         if (e != null) {
                                             log.error("[{}] Error deleting 
topic", topic, e);
+                                            alreadyUnFenced.set(true);
                                             unfenceTopicToResume();
                                             
deleteFuture.completeExceptionally(e);
                                         } else {
@@ -1532,6 +1536,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                     } else {
                                                         log.error("[{}] Error 
deleting topic",
                                                                 topic, 
exception);
+                                                        
alreadyUnFenced.set(true);
                                                         unfenceTopicToResume();
                                                         
deleteFuture.completeExceptionally(
                                                                 new 
PersistenceException(exception));
@@ -1544,6 +1549,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 }
                             });
                 }).exceptionally(ex->{
+                    alreadyUnFenced.set(true);
                     unfenceTopicToResume();
                     deleteFuture.completeExceptionally(
                             new TopicBusyException("Failed to close clients 
before deleting topic.",
@@ -1555,7 +1561,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 }).whenComplete((value, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Error deleting topic", topic, ex);
-                        unfenceTopicToResume();
+                        if (!alreadyUnFenced.get()) {
+                            unfenceTopicToResume();
+                        }
                     }
                 });
 

Reply via email to