This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new a12ae38ad98 [fix] [broker] Fix race-condition causing repeated delete
topic (#23522)
a12ae38ad98 is described below
commit a12ae38ad983ae005f93be6977badae565f5e703
Author: fengyubiao <[email protected]>
AuthorDate: Tue Oct 29 16:01:57 2024 +0800
[fix] [broker] Fix race-condition causing repeated delete topic (#23522)
(cherry picked from commit 7b80f019fa86cf9e154e7dfcd3fd3dc1d036cbba)
---
.../java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java | 9 ++++++---
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 +++++++++-
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index d9269ec83b1..e47443e4e8f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -398,10 +398,13 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
}
private static MetaStoreException getException(Throwable t) {
- if (t.getCause() instanceof
MetadataStoreException.BadVersionException) {
- return new
ManagedLedgerException.BadVersionException(t.getMessage());
+ Throwable actEx = FutureUtil.unwrapCompletionException(t);
+ if (actEx instanceof MetadataStoreException.BadVersionException
badVersionException) {
+ return new
ManagedLedgerException.BadVersionException(badVersionException);
+ } else if (actEx instanceof MetaStoreException metaStoreException){
+ return metaStoreException;
} else {
- return new MetaStoreException(t);
+ return new MetaStoreException(actEx);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5b6d551ac8b..92194af6b36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1455,6 +1455,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
this.closeFutures =
new CloseFutures(new CompletableFuture(), new
CompletableFuture(), new CompletableFuture());
+ AtomicBoolean alreadyUnFenced = new AtomicBoolean();
CompletableFuture<Void> res =
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), ()
-> {
CompletableFuture<Void> deleteFuture = new
CompletableFuture<>();
@@ -1471,6 +1472,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
closeClientFuture.complete(null);
}, getOrderedExecutor()).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
@@ -1493,6 +1495,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic",
topic, ex);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
@@ -1502,6 +1505,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting
topic", topic, e);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
@@ -1532,6 +1536,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
log.error("[{}] Error
deleting topic",
topic,
exception);
+
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new
PersistenceException(exception));
@@ -1544,6 +1549,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
});
}).exceptionally(ex->{
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients
before deleting topic.",
@@ -1555,7 +1561,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}).whenComplete((value, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
- unfenceTopicToResume();
+ if (!alreadyUnFenced.get()) {
+ unfenceTopicToResume();
+ }
}
});