This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a08c8ee5ccb [fix][broker] Allow Access to System Topic Metadata for 
Reader Creation Post-Namespace Deletion (#20304)
a08c8ee5ccb is described below

commit a08c8ee5ccbba279224e43c7c7afa8c4a00031bb
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri May 12 19:29:42 2023 +0800

    [fix][broker] Allow Access to System Topic Metadata for Reader Creation 
Post-Namespace Deletion (#20304)
    
    ## Motivation
    After initiating the snapshot segment function, deletion of topics 
necessitates the activation of readers. Furthermore, these readers should be 
opened and deleted as they are used, which implies that we should not pre-store 
readers. However, after initiating the deletion of namespaces currently, it is 
not allowed to obtain the metadata of partition topics or lookup, making it 
impossible to create readers. This results in the inability to delete 
namespaces.
    
    ## Modification
    Allow the acquisition of system topic metadata after initiating namespace 
deletion, thus creating readers to clean up topic data.
    
    (cherry picked from commit 5d5ec947249df50bf35a78b6a2a0d3b00d97ca66)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 14 ++++++-
 .../pulsar/broker/lookup/TopicLookupBase.java      | 44 ++++++++++++----------
 .../pulsar/broker/transaction/TransactionTest.java | 21 +++++++++++
 3 files changed, 58 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fcade8270cb..de75c5f2132 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
 import static 
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
@@ -4408,8 +4409,13 @@ public class PersistentTopicsBase extends AdminResource {
         // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, 
client fails while creating
         // producer/consumer
+        // It is necessary for system topic operations because system topics 
are used to store metadata
+        // and other vital information. Even after namespace starting 
deletion,,
+        // we need to access the metadata of system topics to create readers 
and clean up topic data.
+        // If we don't do this, it can prevent namespace deletion due to 
inaccessible readers.
         authorizationFuture.thenCompose(__ ->
-                        checkLocalOrGetPeerReplicationCluster(pulsar, 
topicName.getNamespaceObject()))
+                        checkLocalOrGetPeerReplicationCluster(pulsar, 
topicName.getNamespaceObject(),
+                                SystemTopicNames.isSystemTopic(topicName)))
                 .thenCompose(res ->
                         
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
                 .thenAccept(metadata -> {
@@ -4436,7 +4442,11 @@ public class PersistentTopicsBase extends AdminResource {
         // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, 
client fails while creating
         // producer/consumer
-        checkLocalOrGetPeerReplicationCluster(pulsar, 
topicName.getNamespaceObject())
+        // It is necessary for system topic operations because system topics 
are used to store metadata
+        // and other vital information. Even after namespace starting 
deletion,,
+        // we need to access the metadata of system topics to create readers 
and clean up topic data.
+        // If we don't do this, it can prevent namespace deletion due to 
inaccessible readers.
+        checkLocalOrGetPeerReplicationCluster(pulsar, 
topicName.getNamespaceObject(), isSystemTopic(topicName))
             .thenCompose(res -> pulsar.getBrokerService()
                 
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
             .thenAccept(metadata -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 3b64d2a9f83..bd70201cba5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -41,6 +41,7 @@ import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -221,26 +222,31 @@ public class TopicLookupBase extends PulsarWebResource {
                 // (2) authorize client
                 checkAuthorizationAsync(pulsarService, topicName, clientAppId, 
authenticationData).thenRun(() -> {
                         // (3) validate global namespace
+                        // It is necessary for system topic operations because 
system topics are used to store metadata
+                        // and other vital information. Even after namespace 
starting deletion,
+                        // we need to access the metadata of system topics to 
create readers and clean up topic data.
+                        // If we don't do this, it can prevent namespace 
deletion due to inaccessible readers.
                         checkLocalOrGetPeerReplicationCluster(pulsarService,
-                                
topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
-                            if (peerClusterData == null) {
-                                // (4) all validation passed: initiate lookup
-                                validationFuture.complete(null);
-                                return;
-                            }
-                            // if peer-cluster-data is present it means 
namespace is owned by that peer-cluster and
-                            // request should be redirect to the peer-cluster
-                            if 
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
-                                    && 
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
-                                
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService 
url is not configured",
-                                        requestId));
-                                return;
-                            }
-                            
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
-                                    peerClusterData.getBrokerServiceUrlTls(), 
true, LookupType.Redirect,
-                                    requestId,
-                                    false));
+                                topicName.getNamespaceObject(), 
SystemTopicNames.isSystemTopic(topicName))
+                                .thenAccept(peerClusterData -> {
+                                    if (peerClusterData == null) {
+                                        // (4) all validation passed: initiate 
lookup
+                                        validationFuture.complete(null);
+                                        return;
+                                    }
+                                    // if peer-cluster-data is present it 
means namespace is owned by that peer-cluster
+                                    // and request should be redirect to the 
peer-cluster
+                                    if 
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
+                                            && 
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
+                                        
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
+                                                "Redirected cluster's 
brokerService url is not configured",
+                                                requestId));
+                                        return;
+                                    }
+                                    
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
+                                            
peerClusterData.getBrokerServiceUrlTls(), true,
+                                            LookupType.Redirect, requestId,
+                                            false));
                         }).exceptionally(ex -> {
                             Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
                             if (throwable instanceof RestException 
restException){
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c3533e70cf8..c4ec2ec766e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -272,6 +272,27 @@ public class TransactionTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testCanDeleteNamespaceWhenEnableTxnSegmentedSnapshot() throws 
Exception {
+        // Enable the segmented snapshot feature
+        
pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        
pulsarServiceList.get(0).getConfig().setForceDeleteNamespaceAllowed(true);
+
+        // Create a new namespace
+        String namespaceName =  TENANT + 
"/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic";
+        admin.namespaces().createNamespace(namespaceName);
+
+        // Create a new topic in the namespace
+        String topicName = "persistent://" + namespaceName + "/newTopic";
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        producer.close();
+
+        // Destroy the namespace after the test
+        admin.namespaces().deleteNamespace(namespaceName, true);
+        
pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+    }
+
     @Test
     public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
         String subName = "test";

Reply via email to