This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new eddf3956318 [fix] [broker] Fix race-condition causing repeated delete
topic (#23522)
eddf3956318 is described below
commit eddf395631811a731fe9c0284b44fd2f6efd2026
Author: fengyubiao <[email protected]>
AuthorDate: Tue Oct 29 16:01:57 2024 +0800
[fix] [broker] Fix race-condition causing repeated delete topic (#23522)
(cherry picked from commit 7b80f019fa86cf9e154e7dfcd3fd3dc1d036cbba)
---
.../java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java | 9 ++++++---
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 +++++++++-
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 1bc2d2b04be..64e6f86d905 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -413,10 +413,13 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
}
private static MetaStoreException getException(Throwable t) {
- if (t.getCause() instanceof
MetadataStoreException.BadVersionException) {
- return new
ManagedLedgerException.BadVersionException(t.getMessage());
+ Throwable actEx = FutureUtil.unwrapCompletionException(t);
+ if (actEx instanceof MetadataStoreException.BadVersionException
badVersionException) {
+ return new
ManagedLedgerException.BadVersionException(badVersionException);
+ } else if (actEx instanceof MetaStoreException metaStoreException){
+ return metaStoreException;
} else {
- return new MetaStoreException(t);
+ return new MetaStoreException(actEx);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7faa86e5705..14a5e9f46b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1432,6 +1432,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// Mark the progress of close to prevent close calling
concurrently.
this.closeFutures = new CloseFutures(new CompletableFuture(), new
CompletableFuture());
+ AtomicBoolean alreadyUnFenced = new AtomicBoolean();
CompletableFuture<Void> res =
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), ()
-> {
CompletableFuture<Void> deleteFuture = new
CompletableFuture<>();
@@ -1448,6 +1449,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
closeClientFuture.complete(null);
}, getOrderedExecutor()).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
@@ -1470,6 +1472,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic",
topic, ex);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
@@ -1479,6 +1482,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting
topic", topic, e);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
@@ -1509,6 +1513,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
log.error("[{}] Error
deleting topic",
topic,
exception);
+
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new
PersistenceException(exception));
@@ -1521,6 +1526,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
});
}).exceptionally(ex->{
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients
before deleting topic.",
@@ -1532,7 +1538,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}).whenComplete((value, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
- unfenceTopicToResume();
+ if (!alreadyUnFenced.get()) {
+ unfenceTopicToResume();
+ }
}
});