This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eb3169937e9 [fix][admin] Filter pending ack topic while deleting the
namespace (#19719)
eb3169937e9 is described below
commit eb3169937e93d87d7587331aab18077849e14ca7
Author: ran <[email protected]>
AuthorDate: Sun Mar 12 15:16:30 2023 +0800
[fix][admin] Filter pending ack topic while deleting the namespace (#19719)
### Motivation
A transaction system topic not found exception may occur while deleting the
namespace.
**How to happen?**
1. Make sure the topic has a pending ack system
topic(`public/default/test-delete-ns-sub__transaction_pending_ack`).
2. Delete the namespace `public/default`.
3. Namespace deletion operation will try to delete the user-created topic
`public/default/test-delete-ns` first, at this step, the topic will unsubscribe
from all subscriptions, and delete the corresponding pending ack system topic.
4. After the namespace deletion operation delete all user-created topics,
it will try to delete all system topics, which contain the pending ack topic
`public/default/test-delete-ns-sub__transaction_pending_ack`.
5. The topicNotFound exception occurs.
There are two ways to fix this problem.
1. Remove the pending ack topics from the pre-delete topic list.
2. Ignore the topicNotFound exception while deleting the namespace.
~~I think ignoring the exception is better because we don't know if there
are other similar topics in the future.~~
After discussion, we decide to filter the pending ack topics while deleting
the namespace.
### Modifications
Ignore the topicNotFound exception while deleting the namespace.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 7 ++++-
.../pulsar/broker/transaction/TransactionTest.java | 34 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5be675f7b63..cffc94b1892 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -249,7 +249,7 @@ public abstract class NamespacesBase extends AdminResource {
} else {
if
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
topicPolicy.add(topic);
- } else {
+ } else if
(!isDeletedAlongWithUserCreatedTopic(topic)) {
allSystemTopics.add(topic);
}
}
@@ -344,6 +344,11 @@ public abstract class NamespacesBase extends AdminResource
{
});
}
+ private boolean isDeletedAlongWithUserCreatedTopic(String topic) {
+ // The transaction pending ack topic will be deleted while topic
unsubscribe corresponding subscription.
+ return topic.endsWith(SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+ }
+
private CompletableFuture<Void>
internalDeletePartitionedTopicsAsync(List<String> topicNames) {
if (CollectionUtils.isEmpty(topicNames)) {
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a568db3d9f1..20aeac0ed64 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -76,6 +76,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.PulsarService;
@@ -114,6 +115,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -1642,4 +1644,36 @@ public class TransactionTest extends TransactionTestBase
{
.send();
txn.commit();
}
+
+ @Test
+ public void testDeleteNamespace() throws Exception {
+ String namespace = TENANT + "/ns-" +
RandomStringUtils.randomAlphabetic(5);
+ String topic = namespace + "/test-delete-ns";
+ admin.namespaces().createNamespace(namespace);
+ try (Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create()) {
+ producer.newMessage().value("test".getBytes()).send();
+
+ Transaction txn = this.pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ try (Consumer<byte[]> consumer = this.pulsarClient.newConsumer()
+ .topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("sub")
+ .subscribe()) {
+ Message<byte[]> message = consumer.receive();
+ consumer.acknowledgeAsync(message.getMessageId(), txn).get();
+ }
+ try (Producer<byte[]> outProducer = this.pulsarClient.newProducer()
+ .topic(topic + "-out")
+ .create()) {
+ outProducer.newMessage(txn).value("output".getBytes()).send();
+ }
+ txn.commit();
+ }
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+
}