This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b80f019fa8 [fix] [broker] Fix race-condition causing repeated delete
topic (#23522)
7b80f019fa8 is described below
commit 7b80f019fa86cf9e154e7dfcd3fd3dc1d036cbba
Author: fengyubiao <[email protected]>
AuthorDate: Tue Oct 29 16:01:57 2024 +0800
[fix] [broker] Fix race-condition causing repeated delete topic (#23522)
---
.../java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java | 9 ++++++---
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 +++++++++-
.../broker/SameAuthParamsLookupAutoClusterFailoverTest.java | 1 -
.../DisabledCreateTopicToRemoteClusterForReplicationTest.java | 2 +-
.../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 2 +-
.../broker/service/OneWayReplicatorUsingGlobalZKTest.java | 2 +-
.../org/apache/pulsar/broker/service/ReplicationTxnTest.java | 2 +-
7 files changed, 19 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index d9269ec83b1..e47443e4e8f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -398,10 +398,13 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
}
private static MetaStoreException getException(Throwable t) {
- if (t.getCause() instanceof
MetadataStoreException.BadVersionException) {
- return new
ManagedLedgerException.BadVersionException(t.getMessage());
+ Throwable actEx = FutureUtil.unwrapCompletionException(t);
+ if (actEx instanceof MetadataStoreException.BadVersionException
badVersionException) {
+ return new
ManagedLedgerException.BadVersionException(badVersionException);
+ } else if (actEx instanceof MetaStoreException metaStoreException){
+ return metaStoreException;
} else {
- return new MetaStoreException(t);
+ return new MetaStoreException(actEx);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c86a99de0f..541c8a7a225 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1465,6 +1465,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
this.closeFutures =
new CloseFutures(new CompletableFuture(), new
CompletableFuture(), new CompletableFuture());
+ AtomicBoolean alreadyUnFenced = new AtomicBoolean();
CompletableFuture<Void> res =
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), ()
-> {
CompletableFuture<Void> deleteFuture = new
CompletableFuture<>();
@@ -1488,6 +1489,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
@@ -1503,6 +1505,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic",
topic, ex);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
@@ -1512,6 +1515,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting
topic", topic, e);
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
@@ -1542,6 +1546,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
log.error("[{}] Error
deleting topic",
topic,
exception);
+
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new
PersistenceException(exception));
@@ -1554,6 +1559,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
});
}).exceptionally(ex->{
+ alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients
before deleting topic.",
@@ -1565,7 +1571,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}).whenComplete((value, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
- unfenceTopicToResume();
+ if (!alreadyUnFenced.get()) {
+ unfenceTopicToResume();
+ }
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
index fb19ed1ddbb..b39f8135e0e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -42,7 +42,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-@Test(groups = "flaky")
public class SameAuthParamsLookupAutoClusterFailoverTest extends
OneWayReplicatorTestBase {
public void setup() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
index 0246d16b23d..0f8db4aaa73 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
@@ -44,7 +44,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
public class DisabledCreateTopicToRemoteClusterForReplicationTest extends
OneWayReplicatorTestBase {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index f2631f252ab..a8f8d7ecbbd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -104,7 +104,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 827ad78fb26..ad877e8f947 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -44,7 +44,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
index 9fe01617699..bd4a0889c73 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
@@ -59,7 +59,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
public class ReplicationTxnTest extends OneWayReplicatorTestBase {
private boolean transactionBufferSegmentedSnapshotEnabled = false;