This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eb3169937e9 [fix][admin] Filter pending ack topic while deleting the 
namespace (#19719)
eb3169937e9 is described below

commit eb3169937e93d87d7587331aab18077849e14ca7
Author: ran <[email protected]>
AuthorDate: Sun Mar 12 15:16:30 2023 +0800

    [fix][admin] Filter pending ack topic while deleting the namespace (#19719)
    
    ### Motivation
    
    A transaction system topic not found exception may occur while deleting the 
namespace.
    
    **How to happen?**
    1. Make sure the topic has a pending ack system 
topic(`public/default/test-delete-ns-sub__transaction_pending_ack`).
    2. Delete the namespace `public/default`.
    3. Namespace deletion operation will try to delete the user-created topic 
`public/default/test-delete-ns` first, at this step, the topic will unsubscribe 
from all subscriptions, and delete the corresponding pending ack system topic.
    4. After the namespace deletion operation delete all user-created topics, 
it will try to delete all system topics, which contain the pending ack topic 
`public/default/test-delete-ns-sub__transaction_pending_ack`.
    5. The topicNotFound exception occurs.
    
    There are two ways to fix this problem.
    1. Remove the pending ack topics from the pre-delete topic list.
    2. Ignore the topicNotFound exception while deleting the namespace.
    
    ~~I think ignoring the exception is better because we don't know if there 
are other similar topics in the future.~~
    
    After discussion, we decide to filter the pending ack topics while deleting 
the namespace.
    
    ### Modifications
    
    Ignore the topicNotFound exception while deleting the namespace.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  7 ++++-
 .../pulsar/broker/transaction/TransactionTest.java | 34 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5be675f7b63..cffc94b1892 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -249,7 +249,7 @@ public abstract class NamespacesBase extends AdminResource {
                                     } else {
                                         if 
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
                                             topicPolicy.add(topic);
-                                        } else {
+                                        } else if 
(!isDeletedAlongWithUserCreatedTopic(topic)) {
                                             allSystemTopics.add(topic);
                                         }
                                     }
@@ -344,6 +344,11 @@ public abstract class NamespacesBase extends AdminResource 
{
                 });
     }
 
+    private boolean isDeletedAlongWithUserCreatedTopic(String topic) {
+        // The transaction pending ack topic will be deleted while topic 
unsubscribe corresponding subscription.
+        return topic.endsWith(SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+    }
+
     private CompletableFuture<Void> 
internalDeletePartitionedTopicsAsync(List<String> topicNames) {
         if (CollectionUtils.isEmpty(topicNames)) {
             return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a568db3d9f1..20aeac0ed64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -76,6 +76,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.pulsar.broker.PulsarService;
@@ -114,6 +115,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -1642,4 +1644,36 @@ public class TransactionTest extends TransactionTestBase 
{
                     .send();
         txn.commit();
     }
+
+    @Test
+    public void testDeleteNamespace() throws Exception {
+        String namespace = TENANT + "/ns-" + 
RandomStringUtils.randomAlphabetic(5);
+        String topic = namespace + "/test-delete-ns";
+        admin.namespaces().createNamespace(namespace);
+        try (Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create()) {
+            producer.newMessage().value("test".getBytes()).send();
+
+            Transaction txn = this.pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build().get();
+            try (Consumer<byte[]> consumer = this.pulsarClient.newConsumer()
+                    .topic(topic)
+                    
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscriptionName("sub")
+                    .subscribe()) {
+                Message<byte[]> message = consumer.receive();
+                consumer.acknowledgeAsync(message.getMessageId(), txn).get();
+            }
+            try (Producer<byte[]> outProducer = this.pulsarClient.newProducer()
+                    .topic(topic + "-out")
+                    .create()) {
+                outProducer.newMessage(txn).value("output".getBytes()).send();
+            }
+            txn.commit();
+        }
+        admin.namespaces().deleteNamespace(namespace, true);
+    }
+
 }

Reply via email to