This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 06fdbe69d91 [cherry-pick][branch-2.9] Cherry-pick #18307 (Fix can not 
delete namespace by force) (#18803)
06fdbe69d91 is described below

commit 06fdbe69d9149bc85cc648cbf3293878f68022e7
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Dec 8 17:22:31 2022 +0800

    [cherry-pick][branch-2.9] Cherry-pick #18307 (Fix can not delete namespace 
by force) (#18803)
    
    ### Motivation
    Cherry-pick https://github.com/apache/pulsar/pull/18307 for release 2.9.4.
    
    ### Modifications
    
    Cherry-pick https://github.com/apache/pulsar/pull/18307 for release 2.9.4.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 83 ++++++++++++++++++----
 .../broker/service/persistent/PersistentTopic.java | 54 ++++++++------
 .../apache/pulsar/broker/admin/NamespacesTest.java | 65 +++++++++++++++++
 .../broker/transaction/TransactionProduceTest.java | 29 --------
 .../pulsar/broker/transaction/TransactionTest.java |  8 +--
 5 files changed, 167 insertions(+), 72 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0b535b67960..7e2fa1ada7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets.SetView;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -48,6 +49,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.lang3.StringUtils;
@@ -60,6 +62,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -472,11 +475,17 @@ public abstract class NamespacesBase extends 
AdminResource {
             if (!topics.isEmpty()) {
                 Set<String> partitionedTopics = new HashSet<>();
                 Set<String> nonPartitionedTopics = new HashSet<>();
+                Set<String> allSystemTopics = new HashSet<>();
+                Set<String> allPartitionedSystemTopics = new HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
+                            if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                allPartitionedSystemTopics.add(topic);
+                                continue;
+                            }
                             String partitionedTopic = 
topicName.getPartitionedTopicName();
                             if (!partitionedTopics.contains(partitionedTopic)) 
{
                                 // Distinguish partitioned topic to avoid 
duplicate deletion of the same schema
@@ -485,6 +494,10 @@ public abstract class NamespacesBase extends AdminResource 
{
                                 partitionedTopics.add(partitionedTopic);
                             }
                         } else {
+                            if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                allSystemTopics.add(topic);
+                                continue;
+                            }
                             
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
                                     topic, true, true));
                             nonPartitionedTopics.add(topic);
@@ -508,21 +521,24 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }
 
                 final CompletableFuture<Throwable> topicFutureEx =
-                        FutureUtil.waitForAll(topicFutures).handle((result, 
exception) -> {
-                            if (exception != null) {
-                                if (exception.getCause() instanceof 
PulsarAdminException) {
-                                    asyncResponse
-                                            .resume(new 
RestException((PulsarAdminException) exception.getCause()));
-                                } else {
-                                    log.error("[{}] Failed to remove 
forcefully owned namespace {}",
-                                            clientAppId(), namespaceName, 
exception);
-                                    asyncResponse.resume(new 
RestException(exception.getCause()));
-                                }
-                                return exception;
-                            }
-
-                            return null;
-                        });
+                        FutureUtil.waitForAll(topicFutures)
+                                .thenCompose(ignore -> 
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .thenCompose(ignore -> 
internalDeleteTopicsAsync(allSystemTopics))
+                                .handle((result, exception) -> {
+                                    if (exception != null) {
+                                        if (exception.getCause() instanceof 
PulsarAdminException) {
+                                            asyncResponse.resume(
+                                                    new 
RestException((PulsarAdminException) exception.getCause()));
+                                        } else {
+                                            log.error("[{}] Failed to remove 
forcefully owned namespace {}",
+                                                    clientAppId(), 
namespaceName, exception);
+                                            asyncResponse.resume(new 
RestException(exception.getCause()));
+                                        }
+                                        return exception;
+                                    }
+
+                                    return null;
+                                });
                 if (topicFutureEx.join() != null) {
                     return;
                 }
@@ -566,6 +582,43 @@ public abstract class NamespacesBase extends AdminResource 
{
         });
     }
 
+    private CompletableFuture<Void> 
internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete 
topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            TopicName tn = TopicName.get(topicName);
+            String partitionedTopic = tn.getPartitionedTopicName();
+            
futures.add(admin.topics().deletePartitionedTopicAsync(partitionedTopic, true, 
true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+    private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> 
topicNames) {
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete 
topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            futures.add(admin.topics().deleteAsync(topicName, true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+
     protected void internalDeleteNamespaceBundle(String bundleRange, boolean 
authoritative, boolean force) {
         if (force) {
             internalDeleteNamespaceBundleForcefully(bundleRange, 
authoritative);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f630ed32d47..bcf81fb4625 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1176,31 +1176,39 @@ public class PersistentTopic extends AbstractTopic
                 return null;
             });
 
-            closeClientFuture.thenAccept(delete -> {
-                CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
-                brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-                deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema 
? deleteSchema() :
-                                CompletableFuture.completedFuture(null))
-                        .thenCompose(__ -> deleteTopicPolicies())
-                        .thenCompose(__ -> transactionBufferCleanupAndClose())
-                        .whenComplete((v, ex) -> {
-                            if (ex != null) {
-                                log.error("[{}] Error deleting topic", topic, 
ex);
-                                unfenceTopicToResume();
-                                deleteFuture.completeExceptionally(ex);
-                            } else {
-                                List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
-                                subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
-                                
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
-                                    if (e != null) {
-                                        log.error("[{}] Error deleting topic", 
topic, e);
+                closeClientFuture.thenAccept(__ -> {
+                    CompletableFuture<Void> deleteTopicAuthenticationFuture = 
new CompletableFuture<>();
+                    brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+                        deleteTopicAuthenticationFuture.thenCompose(ignore -> 
deleteSchema ? deleteSchema() :
+                                        
CompletableFuture.completedFuture(null))
+                                .thenCompose(ignore -> {
+                                    if 
(!this.getBrokerService().getPulsar().getBrokerService()
+                                            
.isSystemTopic(TopicName.get(topic))) {
+                                        return deleteTopicPolicies();
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                })
+                                .thenCompose(ignore -> 
transactionBufferCleanupAndClose())
+                                .whenComplete((v, ex) -> {
+                                    if (ex != null) {
+                                        log.error("[{}] Error deleting topic", 
topic, ex);
                                         unfenceTopicToResume();
-                                        deleteFuture.completeExceptionally(e);
+                                        deleteFuture.completeExceptionally(ex);
                                     } else {
-                                        ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                                            @Override
-                                            public void 
deleteLedgerComplete(Object ctx) {
-                                                
brokerService.removeTopicFromCache(PersistentTopic.this);
+                                        List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
+                                        subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+
+                                    
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+                                        if (e != null) {
+                                            log.error("[{}] Error deleting 
topic", topic, e);
+                                            unfenceTopicToResume();
+                                            
deleteFuture.completeExceptionally(e);
+                                        } else {
+                                            ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                                @Override
+                                                public void 
deleteLedgerComplete(Object ctx) {
+                                                    
brokerService.removeTopicFromCache(PersistentTopic.this);
 
                                                 
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c4bef2776a2..b5b57769e3b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.v1.Namespaces;
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -69,6 +70,7 @@ import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -79,9 +81,11 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BundlesData;
@@ -105,6 +109,7 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -1771,4 +1776,64 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         BundlesData bundles = admin.namespaces().getBundles(namespace);
         assertEquals(bundles.getNumBundles(), 14);
     }
+    @Test
+    public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws 
Exception {
+        String namespace = this.testTenant + "/delete-namespace";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-namespace",
+                "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+        Field policesService = 
PulsarService.class.getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new 
SystemTopicBasedTopicPoliciesService(pulsar));
+
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. change the order of the topics in this namespace.
+        List<String> topics = 
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+        Assert.assertTrue(topics.size() >= 2);
+        for (int i = 0; i < topics.size(); i++) {
+            if 
(topics.get(i).contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+                String systemTopic = topics.get(i);
+                topics.set(i, topics.get(0));
+                topics.set(0, systemTopic);
+            }
+        }
+        NamespaceService mockNamespaceService = 
spy(pulsar.getNamespaceService());
+        Field namespaceServiceField = 
PulsarService.class.getDeclaredField("nsService");
+        namespaceServiceField.setAccessible(true);
+        namespaceServiceField.set(pulsar, mockNamespaceService);
+        
doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+        // 5. delete the namespace
+        admin.namespaces().deleteNamespace(namespace, true);
+    }
+
+    @Test
+    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws 
Exception {
+        String namespace = this.testTenant + "/delete-systemTopic";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-systemTopic",
+                "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        Field policesService = 
PulsarService.class.getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new 
SystemTopicBasedTopicPoliciesService(pulsar));
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. delete the policies topic and the topic wil not to clear topic 
polices
+        admin.topics().delete(namespace + "/" + 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 316c2f9f9df..cbae03b1a8b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends 
TransactionTestBase {
         produceTest(true);
     }
 
-    @Test
-    public void testDeleteNamespaceBeforeCommit() throws Exception {
-        final String topic = "persistent://" + NAMESPACE3 + 
"/testDeleteTopicBeforeCommit";
-        PulsarClient pulsarClient = this.pulsarClient;
-        Transaction tnx = pulsarClient.newTransaction()
-                .withTransactionTimeout(60, TimeUnit.SECONDS)
-                .build().get();
-        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
-        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
-        Assert.assertTrue(txnIdMostBits > -1);
-        Assert.assertTrue(txnIdLeastBits > -1);
-
-        @Cleanup
-        Producer<byte[]> outProducer = pulsarClient
-                .newProducer()
-                .topic(topic)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .enableBatching(false)
-                .create();
-
-        String content = "Hello Txn";
-        outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
-        try {
-            admin.namespaces().deleteNamespace(NAMESPACE3, true);
-        } catch (Exception ignore) { }
-        tnx.commit().get();
-    }
-
     @Test
     public void produceAndAbortTest() throws Exception {
         produceTest(false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 1e9ed80307d..1ca63d59bb7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -39,7 +39,6 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.Timeout;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
@@ -70,8 +69,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
@@ -159,7 +156,8 @@ public class TransactionTest extends TransactionTestBase {
     public void testCreateTransactionSystemTopic() throws Exception {
         String subName = "test";
         String topicName = TopicName.get(NAMESPACE1 + "/" + 
"testCreateTransactionSystemTopic").toString();
-
+        admin.namespaces().deleteNamespace(NAMESPACE1, true);
+        admin.namespaces().createNamespace(NAMESPACE1);
         try {
             // init pending ack
             @Cleanup
@@ -175,7 +173,7 @@ public class TransactionTest extends TransactionTestBase {
 
         // getList does not include transaction system topic
         List<String> list = admin.topics().getList(NAMESPACE1);
-        assertEquals(list.size(), 4);
+        assertFalse(list.isEmpty());
         list.forEach(topic -> 
assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
 
         try {

Reply via email to