This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b80f019fa8 [fix] [broker] Fix race-condition causing repeated delete 
topic (#23522)
7b80f019fa8 is described below

commit 7b80f019fa86cf9e154e7dfcd3fd3dc1d036cbba
Author: fengyubiao <[email protected]>
AuthorDate: Tue Oct 29 16:01:57 2024 +0800

    [fix] [broker] Fix race-condition causing repeated delete topic (#23522)
---
 .../java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java |  9 ++++++---
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 +++++++++-
 .../broker/SameAuthParamsLookupAutoClusterFailoverTest.java    |  1 -
 .../DisabledCreateTopicToRemoteClusterForReplicationTest.java  |  2 +-
 .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java |  2 +-
 .../broker/service/OneWayReplicatorUsingGlobalZKTest.java      |  2 +-
 .../org/apache/pulsar/broker/service/ReplicationTxnTest.java   |  2 +-
 7 files changed, 19 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index d9269ec83b1..e47443e4e8f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -398,10 +398,13 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
     }
 
     private static MetaStoreException getException(Throwable t) {
-        if (t.getCause() instanceof 
MetadataStoreException.BadVersionException) {
-            return new 
ManagedLedgerException.BadVersionException(t.getMessage());
+        Throwable actEx = FutureUtil.unwrapCompletionException(t);
+        if (actEx instanceof MetadataStoreException.BadVersionException 
badVersionException) {
+            return new 
ManagedLedgerException.BadVersionException(badVersionException);
+        } else if (actEx instanceof MetaStoreException metaStoreException){
+            return metaStoreException;
         } else {
-            return new MetaStoreException(t);
+            return new MetaStoreException(actEx);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c86a99de0f..541c8a7a225 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1465,6 +1465,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             this.closeFutures =
                     new CloseFutures(new CompletableFuture(), new 
CompletableFuture(), new CompletableFuture());
 
+            AtomicBoolean alreadyUnFenced = new AtomicBoolean();
             CompletableFuture<Void> res = 
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
                         
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () 
-> {
                 CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
@@ -1488,6 +1489,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     }
                 }).exceptionally(ex -> {
                     log.error("[{}] Error closing clients", topic, ex);
+                    alreadyUnFenced.set(true);
                     unfenceTopicToResume();
                     closeClientFuture.completeExceptionally(ex);
                     return null;
@@ -1503,6 +1505,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 .whenComplete((v, ex) -> {
                                     if (ex != null) {
                                         log.error("[{}] Error deleting topic", 
topic, ex);
+                                        alreadyUnFenced.set(true);
                                         unfenceTopicToResume();
                                         deleteFuture.completeExceptionally(ex);
                                     } else {
@@ -1512,6 +1515,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                     
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
                                         if (e != null) {
                                             log.error("[{}] Error deleting 
topic", topic, e);
+                                            alreadyUnFenced.set(true);
                                             unfenceTopicToResume();
                                             
deleteFuture.completeExceptionally(e);
                                         } else {
@@ -1542,6 +1546,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                     } else {
                                                         log.error("[{}] Error 
deleting topic",
                                                                 topic, 
exception);
+                                                        
alreadyUnFenced.set(true);
                                                         unfenceTopicToResume();
                                                         
deleteFuture.completeExceptionally(
                                                                 new 
PersistenceException(exception));
@@ -1554,6 +1559,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 }
                             });
                 }).exceptionally(ex->{
+                    alreadyUnFenced.set(true);
                     unfenceTopicToResume();
                     deleteFuture.completeExceptionally(
                             new TopicBusyException("Failed to close clients 
before deleting topic.",
@@ -1565,7 +1571,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 }).whenComplete((value, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Error deleting topic", topic, ex);
-                        unfenceTopicToResume();
+                        if (!alreadyUnFenced.get()) {
+                            unfenceTopicToResume();
+                        }
                     }
                 });
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
index fb19ed1ddbb..b39f8135e0e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -42,7 +42,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-@Test(groups = "flaky")
 public class SameAuthParamsLookupAutoClusterFailoverTest extends 
OneWayReplicatorTestBase {
 
     public void setup() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
index 0246d16b23d..0f8db4aaa73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
@@ -44,7 +44,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
 public class DisabledCreateTopicToRemoteClusterForReplicationTest extends 
OneWayReplicatorTestBase {
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index f2631f252ab..a8f8d7ecbbd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -104,7 +104,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
 public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 827ad78fb26..ad877e8f947 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -44,7 +44,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
 public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest {
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
index 9fe01617699..bd4a0889c73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
@@ -59,7 +59,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
 public class ReplicationTxnTest extends OneWayReplicatorTestBase {
 
     private boolean transactionBufferSegmentedSnapshotEnabled = false;

Reply via email to