This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e3e78d992e [cleanup] PIP-457: Remove NamespaceName.isGlobal() and 
TopicName.isGlobal() (#25319)
2e3e78d992e is described below

commit 2e3e78d992e873e358d16fa875f79cb607019284
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Mar 15 23:58:25 2026 -0700

    [cleanup] PIP-457: Remove NamespaceName.isGlobal() and TopicName.isGlobal() 
(#25319)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |   2 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 349 ++++++---------------
 .../broker/admin/v2/NonPersistentTopics.java       |  14 +-
 .../pulsar/broker/service/BrokerService.java       |   3 -
 .../service/nonpersistent/NonPersistentTopic.java  |  58 ++--
 .../broker/service/persistent/PersistentTopic.java |  84 +++--
 .../pulsar/broker/web/PulsarWebResource.java       |   2 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |   3 +-
 .../apache/pulsar/common/naming/NamespaceName.java |   4 -
 .../org/apache/pulsar/common/naming/TopicName.java |   4 -
 .../pulsar/common/naming/NamespaceNameTest.java    |   2 -
 11 files changed, 169 insertions(+), 356 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ea8e584c6e3..190ed011143 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -597,7 +597,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 .thenCompose(__ -> 
provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
                 .thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
                 .thenRun(() -> {
-                    if (!createLocalTopicOnly && topicName.isGlobal()
+                    if (!createLocalTopicOnly
                             && 
pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
                         
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
                         log.info("[{}] Successfully created partitioned for 
topic {} for the remote clusters",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fed59cf06e2..ad6854e2426 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -328,11 +328,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Void> 
internalCreateNonPartitionedTopicAsync(boolean authoritative,
                                                      Map<String, String> 
properties) {
-        CompletableFuture<Void> ret = 
validateNonPartitionTopicNameAsync(topicName.getLocalName());
-        if (topicName.isGlobal()) {
-            ret = ret.thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
-        }
-        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        return validateNonPartitionTopicNameAsync(topicName.getLocalName())
+                .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
            .thenCompose(__ -> 
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
                    NamespaceOperation.CREATE_TOPIC))
            .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
false, false))
@@ -472,7 +470,7 @@ public class PersistentTopicsBase extends AdminResource {
                                     })
                     );
                 }).thenCompose(__ -> {
-                    if (updateLocal || !topicName.isGlobal()) {
+                    if (updateLocal) {
                         return CompletableFuture.completedFuture(null);
                     }
                     // update remote cluster
@@ -531,9 +529,9 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
         getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
             if (metadata != null && metadata.partitions > 0) {
-                CompletableFuture<Void> future = 
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
-                        NamespaceOperation.CREATE_TOPIC);
-                future.thenCompose(__ -> 
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
+                validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+                        NamespaceOperation.CREATE_TOPIC)
+                .thenCompose(__ -> 
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
                     asyncResponse.resume(Response.noContent().build());
                 }).exceptionally(e -> {
                     log.error("[{}] Failed to create partitions for topic {}", 
clientAppId(), topicName);
@@ -907,13 +905,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            }
-            return CompletableFuture.completedFuture(null);
-        }).thenAccept(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenAccept(__ -> {
            // If the topic name is a partition name, no need to get partition 
topic metadata again
            if (topicName.isPartitioned()) {
                if (isTransactionCoordinatorAssign(topicName)) {
@@ -1182,14 +1176,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetSubscriptions(AsyncResponse asyncResponse, 
boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        validateTopicOperationAsync(topicName, 
TopicOperation.GET_SUBSCRIPTIONS)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenAccept(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
@@ -1312,28 +1301,18 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected CompletableFuture<? extends TopicStats> 
internalGetStatsAsync(boolean authoritative,
                                                                             
GetStatsOptions getStatsOptions) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
-        return future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        return validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> getTopicReferenceAsync(topicName))
         .thenCompose(topic -> topic.asyncGetStats(getStatsOptions));
     }
 
     protected CompletableFuture<PersistentTopicInternalStats> 
internalGetInternalStatsAsync(boolean authoritative,
                                                                                
             boolean metadata) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
-        return future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        return validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> {
             if (metadata) {
                 return validateTopicOperationAsync(topicName, 
TopicOperation.GET_METADATA);
@@ -1345,13 +1324,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, 
boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            }
-            return CompletableFuture.completedFuture(null);
-        }).thenAccept(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenAccept(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
                 
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
@@ -1463,13 +1438,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, 
boolean authoritative, boolean perPartition,
                                                GetStatsOptions 
getStatsOptions) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            }
-            return  CompletableFuture.completedFuture(null);
-        }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
+        validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
                 authoritative, false)).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1553,14 +1524,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetPartitionedStatsInternal(AsyncResponse 
asyncResponse, boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
+        validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
         .thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1611,14 +1577,9 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> internalDeleteSubscriptionAsync(String 
subName,
                                                                       boolean 
authoritative,
                                                                       boolean 
force) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.UNSUBSCRIBE, subName);
-        return future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> {
+        return validateTopicOperationAsync(topicName, 
TopicOperation.UNSUBSCRIBE, subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> {
             if (topicName.isPartitioned()) {
                 return 
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, 
force);
             } else {
@@ -1790,14 +1751,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
@@ -1903,14 +1859,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalSkipMessages(AsyncResponse asyncResponse, String 
subName, int numMessages,
                                         boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
         .thenCompose(partitionMetadata -> {
              if (partitionMetadata.partitions > 0) {
@@ -1967,14 +1918,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalExpireMessagesForAllSubscriptions(AsyncResponse 
asyncResponse, int expireTimeInSeconds,
             boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
+        validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
         .thenAccept(partitionMetadata -> {
             if (topicName.isPartitioned()) {
                 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
@@ -2102,14 +2048,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Void> internalResetCursorAsync(String subName, 
long timestamp,
             boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName);
-        return future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        return validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
@@ -2206,14 +2147,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalCreateSubscription(AsyncResponse asyncResponse, 
String subscriptionName,
             MessageIdImpl messageId, boolean authoritative, boolean 
replicated, Map<String, String> properties) {
-        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.SUBSCRIBE,
-                subscriptionName);
-        ret.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            }
-            return CompletableFuture.completedFuture(null);
-        }).thenAccept(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, 
subscriptionName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenAccept(__ -> {
             final MessageIdImpl targetMessageId = messageId == null ? 
(MessageIdImpl) MessageId.latest : messageId;
             log.info("[{}][{}] Creating subscription {} at message id {} with 
properties {}", clientAppId(),
                     topicName, subscriptionName, targetMessageId, properties);
@@ -2372,13 +2308,9 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalUpdateSubscriptionProperties(AsyncResponse 
asyncResponse, String subName,
                                                         Map<String, String> 
subscriptionProperties,
                                                         boolean authoritative) 
{
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            }
-            return CompletableFuture.completedFuture(null);
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, 
subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
             if (topicName.isPartitioned()) {
                 
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, 
subName,
                         subscriptionProperties, authoritative);
@@ -2450,13 +2382,9 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalAnalyzeSubscriptionBacklog(AsyncResponse 
asyncResponse, String subName,
                                                       Optional<Position> 
position,
                                                       boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            }
-            return CompletableFuture.completedFuture(null);
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> {
                     if (topicName.isPartitioned()) {
                         return CompletableFuture.completedFuture(null);
@@ -2488,13 +2416,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalGetSubscriptionProperties(AsyncResponse 
asyncResponse, String subName,
                                                         boolean authoritative) 
{
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            }
-            return CompletableFuture.completedFuture(null);
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
             if (topicName.isPartitioned()) {
                 
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
                         authoritative);
@@ -2572,8 +2496,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, 
String subName, boolean authoritative,
             MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
-        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName);
-        ret.thenCompose(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, 
subName)
+        .thenCompose(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (!topicName.isPartitioned()) {
                 return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -2589,13 +2513,8 @@ public class PersistentTopicsBase extends AdminResource {
             } else {
                 return CompletableFuture.completedFuture(null);
             }
-        }).thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> {
+        }).thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> {
             log.info("[{}][{}] received reset cursor on subscription {} to 
position {}", clientAppId(), topicName,
                     subName, messageId);
             return validateTopicOwnershipAsync(topicName, authoritative);
@@ -2746,14 +2665,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Response> internalGetMessageById(long 
ledgerId, long entryId, boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
-        return future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> {
+        return validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> {
             if (topicName.isPartitioned()) {
                 return CompletableFuture.completedFuture(null);
             } else {
@@ -2809,14 +2723,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<MessageId> 
internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
-        return future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> {
+        return validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> {
                 if (topicName.isPartitioned()) {
                     return CompletableFuture.completedFuture(null);
                 } else {
@@ -2880,8 +2789,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Response> internalPeekNthMessageAsync(String 
subName, int messagePosition,
                                                                       boolean 
authoritative) {
-        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES, subName);
-        return ret.thenCompose(__ -> {
+        return validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES, subName)
+        .thenCompose(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (!topicName.isPartitioned()) {
                 return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -2939,14 +2848,9 @@ public class PersistentTopicsBase extends AdminResource {
                                                                       boolean 
authoritative) {
         long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
         String initialPositionLocal = initialPosition == null ? "latest" : 
initialPosition;
-        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES);
-        return ret.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        return validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> {
             if (!topicName.isPartitioned()) {
                 return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -3174,15 +3078,10 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<PersistentOfflineTopicStats> 
internalGetBacklogAsync(boolean authoritative) {
-        CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            ret = CompletableFuture.completedFuture(null);
-        }
         // Validate that namespace exists, throw 404 if it doesn't exist
         // note that we do not want to load the topic and hence skip 
authorization check
-        return ret.thenCompose(__ -> 
namespaceResources().getPoliciesAsync(namespaceName))
+        return validateGlobalNamespaceOwnershipAsync(namespaceName)
+                .thenCompose(__ -> 
namespaceResources().getPoliciesAsync(namespaceName))
                 .thenCompose(__ -> {
                     PersistentOfflineTopicStats offlineTopicStats =
                             
pulsar().getBrokerService().getOfflineTopicStat(topicName);
@@ -3245,8 +3144,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse,
                                                      MessageIdImpl messageId, 
boolean authoritative) {
-        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.GET_BACKLOG_SIZE);
-        ret.thenCompose(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE)
+        .thenCompose(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (!topicName.isPartitioned()) {
                 return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -3262,13 +3161,8 @@ public class PersistentTopicsBase extends AdminResource {
             } else {
                 return CompletableFuture.completedFuture(null);
             }
-        }).thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        }).thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(unused -> getTopicReferenceAsync(topicName))
         .thenAccept(t -> {
             PersistentTopic topic = (PersistentTopic) t;
@@ -3725,13 +3619,7 @@ public class PersistentTopicsBase extends AdminResource {
             return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
                     "Not allowed to set/get topic policy for a partition"));
         }
-        CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            ret = CompletableFuture.completedFuture(null);
-        }
-        return ret
+        return validateGlobalNamespaceOwnershipAsync(namespaceName)
                 .thenCompose(__ -> checkTopicExistsAsync(topicName))
                 .thenCompose(topicExistsInfo -> {
                     if (!topicExistsInfo.isExists()) {
@@ -3792,14 +3680,9 @@ public class PersistentTopicsBase extends AdminResource {
                     "Termination of a system topic is not allowed"));
         }
 
-        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.TERMINATE);
-        return ret.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        return validateTopicOperationAsync(topicName, TopicOperation.TERMINATE)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
         .thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions > 0) {
@@ -3825,14 +3708,9 @@ public class PersistentTopicsBase extends AdminResource {
             asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, 
msg));
             return;
         }
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.TERMINATE);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
+        validateTopicOperationAsync(topicName, TopicOperation.TERMINATE)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
         .thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 String msg = "Termination of a non-partitioned topic is not 
allowed using partitioned-terminate"
@@ -3893,15 +3771,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
                                                      int expireTimeInSeconds, 
boolean authoritative) {
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES,
-                subName);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, 
subName)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ ->
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             getPartitionedTopicMetadataAsync(topicName, authoritative, false)
@@ -4056,8 +3928,8 @@ public class PersistentTopicsBase extends AdminResource {
             asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, 
msg));
             return;
         }
-        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName);
-        ret.thenCompose(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, 
subName)
+        .thenCompose(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (!topicName.isPartitioned()) {
                 return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -4072,13 +3944,8 @@ public class PersistentTopicsBase extends AdminResource {
             } else {
                 return CompletableFuture.completedFuture(null);
             }
-        }).thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        }).thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
         .thenCompose(__ -> {
             log.info("[{}][{}] Received expire messages on subscription {} to 
position {}", clientAppId(),
                     topicName, subName, messageId);
@@ -4180,14 +4047,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalTriggerCompaction(AsyncResponse asyncResponse, 
boolean authoritative) {
         log.info("[{}] Trigger compaction on topic {}", clientAppId(), 
topicName);
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.COMPACT);
-        future.thenCompose(__ -> {
-            if (topicName.isGlobal()) {
-                return validateGlobalNamespaceOwnershipAsync(namespaceName);
-            } else {
-                return CompletableFuture.completedFuture(null);
-            }
-        }).thenAccept(__ -> {
+        validateTopicOperationAsync(topicName, TopicOperation.COMPACT)
+        .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+        .thenAccept(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
                 internalTriggerCompactionNonPartitionedTopic(asyncResponse, 
authoritative);
@@ -4957,13 +4819,6 @@ public class PersistentTopicsBase extends AdminResource {
             return;
         }
 
-        // Reject the request if the topic is not global
-        if (!topicName.isGlobal()) {
-            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Cannot enable/disable replicated subscriptions on 
non-global topics"));
-            return;
-        }
-
         // 1.Permission to consume this topic is required
         // 2.Redirect the request to the peer-cluster if the local cluster is 
not included in the replication clusters
         CompletableFuture<Void> validateFuture =
@@ -5097,13 +4952,6 @@ public class PersistentTopicsBase extends AdminResource {
             return;
         }
 
-        // Reject the request if the topic is not global
-        if (!topicName.isGlobal()) {
-            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Cannot get replicated subscriptions on non-global 
topics"));
-            return;
-        }
-
         // Permission to consume this topic is required
         CompletableFuture<Void> validateFuture =
                 validateTopicOperationAsync(topicName, 
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
@@ -5392,14 +5240,9 @@ public class PersistentTopicsBase extends AdminResource {
                     "Invalid message index: " + index));
         }
         int partitionIndex = topicName.getPartitionIndex();
-        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
-        return future.thenCompose(__ -> {
-                    if (topicName.isGlobal()) {
-                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }).thenCompose(__ -> {
+        return validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES)
+                .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
+                .thenCompose(__ -> {
                     if (topicName.isPartitioned()) {
                         return CompletableFuture.completedFuture(null);
                     } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 9071884185d..16cbc7104d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -239,14 +239,12 @@ public class NonPersistentTopics extends PersistentTopics 
{
                 throw new RestException(Response.Status.PRECONDITION_FAILED,
                         "Partitioned Topic Name should not contain 
'-partition-'");
             }
-            if (topicName.isGlobal()) {
-                try {
-                    validateGlobalNamespaceOwnership(namespaceName);
-                } catch (Exception e) {
-                    log.error("[{}] Failed to get partitioned stats for {}", 
clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                    return;
-                }
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get partitioned stats for {}", 
clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
             }
             getPartitionedTopicMetadataAsync(topicName,
                     authoritative, false).thenAccept(partitionMetadata -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 513b16a5255..79a26ec56ae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2791,9 +2791,6 @@ public class BrokerService implements Closeable {
      * @param namespace
      */
     private void unloadDeletedReplNamespace(Policies data, NamespaceName 
namespace) {
-        if (!namespace.isGlobal()) {
-            return;
-        }
         final String localCluster = 
this.pulsar.getConfiguration().getClusterName();
         if (pulsar.getBrokerService().isCurrentClusterAllowed(namespace, 
data)) {
             return;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 2b14df499ae..45a92c92f7e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -583,7 +583,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+        if (NamespaceService.isHeartbeatNamespace(name)
                 || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
@@ -990,11 +990,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     }
 
     public boolean isActive() {
-        if (TopicName.get(topic).isGlobal()) {
-            // No local consumers and no local producers
-            return !subscriptions.isEmpty() || hasLocalProducers();
-        }
-        return currentUsageCount() != 0 || !subscriptions.isEmpty();
+        // No local consumers and no local producers
+        return !subscriptions.isEmpty() || hasLocalProducers();
     }
 
     @Override
@@ -1050,34 +1047,31 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         } else {
             if (System.nanoTime() - lastActive > 
TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
 
-                if (TopicName.get(topic).isGlobal()) {
-                    // For global namespace, close repl producers first.
-                    // Once all repl producers are closed, we can delete the 
topic,
-                    // provided no remote producers connected to the broker.
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Global topic inactive for {} seconds, 
closing repl producers.", topic,
-                            maxInactiveDurationInSec);
-                    }
+                // Close repl producers first.
+                // Once all repl producers are closed, we can delete the topic,
+                // provided no remote producers connected to the broker.
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Topic inactive for {} seconds, closing 
repl producers.", topic,
+                        maxInactiveDurationInSec);
+                }
 
-                    stopReplProducers().thenCompose(v -> delete(true, false))
-                            .thenCompose(__ -> 
tryToDeletePartitionedMetadata())
-                            .thenRun(() -> log.info("[{}] Topic deleted 
successfully due to inactivity", topic))
-                            .exceptionally(e -> {
-                                Throwable throwable = e.getCause();
-                                if (throwable instanceof TopicBusyException) {
-                                    // topic became active again
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("[{}] Did not delete busy 
topic: {}", topic,
-                                                throwable.getMessage());
-                                    }
-                                    replicators.forEach((region, replicator) 
-> replicator.startProducer());
-                                } else {
-                                    log.warn("[{}] Inactive topic deletion 
failed", topic, e);
+                stopReplProducers().thenCompose(v -> delete(true, false))
+                        .thenCompose(__ -> tryToDeletePartitionedMetadata())
+                        .thenRun(() -> log.info("[{}] Topic deleted 
successfully due to inactivity", topic))
+                        .exceptionally(e -> {
+                            Throwable throwable = e.getCause();
+                            if (throwable instanceof TopicBusyException) {
+                                // topic became active again
+                                if (log.isDebugEnabled()) {
+                                    log.debug("[{}] Did not delete busy topic: 
{}", topic,
+                                            throwable.getMessage());
                                 }
-                                return null;
-                            });
-
-                }
+                                replicators.forEach((region, replicator) -> 
replicator.startProducer());
+                            } else {
+                                log.warn("[{}] Inactive topic deletion 
failed", topic, e);
+                            }
+                            return null;
+                        });
             }
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index da4d717e798..dfbb2725db4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1942,7 +1942,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+        if (NamespaceService.isHeartbeatNamespace(name)
                 || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
@@ -2112,7 +2112,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     if (nsPolicies.isPresent()) {
                         allowedClusters = nsPolicies.get().allowed_clusters;
                     }
-                    if (TopicName.get(topic).isGlobal() && 
!topicRepls.contains(localCluster)
+                    if (!topicRepls.contains(localCluster)
                             && !allowedClusters.contains(localCluster)) {
                         log.warn("Local cluster {} is not part of global 
namespace repl list {} and allowed list {}",
                                 localCluster, topicRepls, allowedClusters);
@@ -3197,12 +3197,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 }
                 break;
         }
-        if (TopicName.get(topic).isGlobal()) {
-            // no local producers
-            return hasLocalProducers();
-        } else {
-            return currentUsageCount() != 0;
-        }
+        // no local producers
+        return hasLocalProducers();
     }
 
     private boolean hasBacklogs(boolean getPreciseBacklog) {
@@ -3410,46 +3406,42 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         } else {
             CompletableFuture<Void> replCloseFuture = new 
CompletableFuture<>();
 
-            if (TopicName.get(topic).isGlobal()) {
-                // For global namespace, close repl producers first.
-                // Once all repl producers are closed, we can delete the topic,
-                // provided no remote producers connected to the broker.
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Global topic inactive for {} seconds, 
closing repl producers.", topic,
-                        maxInactiveDurationInSec);
-                }
-                /**
-                 * There is a race condition that may cause a NPE:
-                 * - task 1: a callback of "replicator.cursor.asyncRead" will 
trigger a replication.
-                 * - task 2: "closeReplProducersIfNoBacklog" called by current 
thread will make the variable
-                 *   "replicator.producer" to a null value.
-                 * Race condition: task 1 will get a NPE when it tries to send 
messages using the variable
-                 * "replicator.producer", because task 2 will set this 
variable to "null".
-                 * TODO Create a seperated PR to fix it.
-                 */
-                closeReplProducersIfNoBacklog().thenRun(() -> {
-                    if (hasRemoteProducers()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Global topic has connected remote 
producers. Not a candidate for GC",
-                                    topic);
-                        }
-                        replCloseFuture
-                                .completeExceptionally(new 
TopicBusyException("Topic has connected remote producers"));
-                    } else {
-                        log.info("[{}] Global topic inactive for {} seconds, 
closed repl producers", topic,
-                            maxInactiveDurationInSec);
-                        replCloseFuture.complete(null);
-                    }
-                }).exceptionally(e -> {
+            // Close repl producers first.
+            // Once all repl producers are closed, we can delete the topic,
+            // provided no remote producers connected to the broker.
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Topic inactive for {} seconds, closing repl 
producers.", topic,
+                    maxInactiveDurationInSec);
+            }
+            /**
+             * There is a race condition that may cause a NPE:
+             * - task 1: a callback of "replicator.cursor.asyncRead" will 
trigger a replication.
+             * - task 2: "closeReplProducersIfNoBacklog" called by current 
thread will make the variable
+             *   "replicator.producer" to a null value.
+             * Race condition: task 1 will get a NPE when it tries to send 
messages using the variable
+             * "replicator.producer", because task 2 will set this variable to 
"null".
+             * TODO Create a seperated PR to fix it.
+             */
+            closeReplProducersIfNoBacklog().thenRun(() -> {
+                if (hasRemoteProducers()) {
                     if (log.isDebugEnabled()) {
-                        log.debug("[{}] Global topic has replication backlog. 
Not a candidate for GC", topic);
+                        log.debug("[{}] Topic has connected remote producers. 
Not a candidate for GC",
+                                topic);
                     }
-                    replCloseFuture.completeExceptionally(e.getCause());
-                    return null;
-                });
-            } else {
-                replCloseFuture.complete(null);
-            }
+                    replCloseFuture
+                            .completeExceptionally(new 
TopicBusyException("Topic has connected remote producers"));
+                } else {
+                    log.info("[{}] Topic inactive for {} seconds, closed repl 
producers", topic,
+                        maxInactiveDurationInSec);
+                    replCloseFuture.complete(null);
+                }
+            }).exceptionally(e -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Topic has replication backlog. Not a 
candidate for GC", topic);
+                }
+                replCloseFuture.completeExceptionally(e.getCause());
+                return null;
+            });
 
             replCloseFuture.thenCompose(v -> delete(deleteMode == 
InactiveTopicDeleteMode.delete_when_no_subscriptions,
                 deleteMode == 
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false))
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 407458a1090..3fe51df90f7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -872,7 +872,7 @@ public abstract class PulsarWebResource {
     public static CompletableFuture<ClusterDataImpl> 
checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
                                                                                
      NamespaceName namespace,
                                                                                
      boolean allowDeletedNamespace) {
-        if (!namespace.isGlobal() || 
NamespaceService.isSLAOrHeartbeatNamespace(namespace.toString())) {
+        if (NamespaceService.isSLAOrHeartbeatNamespace(namespace.toString())) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index ca3f039fea4..c40e8a84c5e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -223,8 +223,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         admin.clusters().createCluster("use", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
         admin.clusters().createCluster("usw", 
ClusterData.builder().serviceUrl("http://127.0.0.2:8082";).build());
         admin.clusters().createCluster("usc", 
ClusterData.builder().serviceUrl("http://127.0.0.3:8083";).build());
-        // After V1 removal, all namespaces go through the peer-cluster 
redirect path
-        // (NamespaceName.isGlobal() always returns true), so peer clusters 
must be configured.
+        // All namespaces go through the peer-cluster redirect path, so peer 
clusters must be configured.
         // Only "usc" is a peer because peer clusters cannot also be 
replication clusters.
         admin.clusters().updatePeerClusterNames("use",
                 new LinkedHashSet<>(List.of("usc")));
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
index a2e13125a96..bda21502dfb 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
@@ -119,10 +119,6 @@ public class NamespaceName implements ServiceUnitId {
         return localName;
     }
 
-    public boolean isGlobal() {
-        return true;
-    }
-
     public String getPersistentTopicName(String localTopic) {
         return getTopicName(TopicDomain.persistent, localTopic);
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index ad14edabde0..a1c7055a897 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -352,10 +352,6 @@ public class TopicName implements ServiceUnitId {
         return String.format("%s/%s/%s/%s", domain, tenant, namespacePortion, 
getEncodedLocalName());
     }
 
-    public boolean isGlobal() {
-        return namespaceName.isGlobal();
-    }
-
     public String getSchemaName() {
         return getTenant()
             + "/" + getNamespacePortion()
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
index 5d2579ce4f0..24b86daa562 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.common.naming;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
 import org.testng.annotations.Test;
 
 public class NamespaceNameTest {
@@ -114,7 +113,6 @@ public class NamespaceNameTest {
         NamespaceName ns = NamespaceName.get("my-tenant/my-namespace");
         assertEquals(ns.getTenant(), "my-tenant");
         assertEquals(ns.getLocalName(), "my-namespace");
-        assertTrue(ns.isGlobal());
         assertEquals(ns.getPersistentTopicName("my-topic"), 
"persistent://my-tenant/my-namespace/my-topic");
     }
 }

Reply via email to