codelipenghui commented on code in PR #25304:
URL: https://github.com/apache/pulsar/pull/25304#discussion_r2920544503


##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java:
##########
@@ -133,40 +126,20 @@ private TopicName(String completeTopicName) {
                 }
             }
 
-            // The fully qualified topic name can be in two different forms:
-            // new:    persistent://tenant/namespace/topic
-            // legacy: persistent://tenant/cluster/namespace/topic
-
+            // Expected format: persistent://tenant/namespace/topic
             List<String> parts = 
Splitter.on("://").limit(2).splitToList(completeTopicName);
             this.domain = TopicDomain.getEnum(parts.get(0));
 
             String rest = parts.get(1);
 
-            // The rest of the name can be in different forms:
-            // new:    tenant/namespace/<localName>
-            // legacy: tenant/cluster/namespace/<localName>
-            // Examples of localName:
-            // 1. some, name, xyz
-            // 2. xyz-123, feeder-2
-
-
-            parts = Splitter.on("/").limit(4).splitToList(rest);
+            // Expected format: tenant/namespace/<localName>

Review Comment:
   `limit(3)` silently reinterprets V1 topic names instead of rejecting them.
   
   For `persistent://tenant/cluster/namespace/topic`, this splits into 
`["tenant", "cluster", "namespace/topic"]` and is accepted as a valid V2 topic 
with tenant="tenant", namespace="cluster", localName="namespace/topic".
   
   Users with old V1 topic names would **silently produce/consume on the wrong 
namespace**. Unlike `NamespaceName` which rejects 3-part names with a clear 
error, `TopicName` silently accepts.
   
   Suggestion: keep `limit(4)`, detect `parts.size() == 4`, and throw:
   ```java
   } else if (parts.size() == 4) {
       throw new IllegalArgumentException(
           "V1 topic names (with cluster component) are no longer supported. "
           + "Please use the V2 format: '<domain>://tenant/namespace/topic'. 
Got: " + completeTopicName);
   }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -562,63 +549,51 @@ protected CompletableFuture<Void> 
internalDeleteNamespaceBundleAsync(String bund
                 clientAppId(), namespaceName, bundleRange, authoritative, 
force);
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.DELETE_BUNDLE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> {
-                    if (!namespaceName.isGlobal()) {
-                        return 
validateClusterOwnershipAsync(namespaceName.getCluster());
-                    }
-                    return CompletableFuture.completedFuture(null);
-                })
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
                 .thenCompose(policies -> {
                     CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
-                    if (namespaceName.isGlobal()) {
-                        // Just keep the behavior of V1 namespace being the 
same as before.
-                        if (!namespaceName.isV2() && 
policies.replication_clusters.isEmpty()
-                                && policies.allowed_clusters.isEmpty()) {
-                            return CompletableFuture.completedFuture(null);
-                        }
-                        String cluster = 
policies.getClusterThatCanDeleteNamespace();
-                        if (cluster == null) {
-                            // There are still more than one clusters 
configured for the global namespace
-                            throw new 
RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
-                                    + namespaceName
-                                    + ". There are still more than one 
replication clusters configured.");
-                        }
-                        if (!cluster.equals(config().getClusterName())) { // 
No need to change.
-                            // the only replication cluster is other cluster, 
redirect
-                            future = 
clusterResources().getClusterAsync(cluster)
-                                    .thenCompose(clusterData -> {
-                                        if (clusterData.isEmpty()) {
-                                            throw new 
RestException(Status.NOT_FOUND,
-                                                    "Cluster " + cluster + " 
does not exist");
-                                        }
-                                        ClusterData replClusterData = 
clusterData.get();
-                                        URL replClusterUrl;
-                                        try {
-                                            if (!config().isTlsEnabled() || 
!isRequestHttps()) {
-                                                replClusterUrl = new 
URL(replClusterData.getServiceUrl());
-                                            } else if 
(StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
-                                                replClusterUrl = new 
URL(replClusterData.getServiceUrlTls());
-                                            } else {
-                                                throw new 
RestException(Status.PRECONDITION_FAILED,
-                                                        "The replication 
cluster does not provide TLS encrypted "
-                                                                + "service");
-                                            }
-                                        } catch (MalformedURLException 
malformedURLException) {
-                                            throw new 
RestException(malformedURLException);
+                    String cluster = 
policies.getClusterThatCanDeleteNamespace();
+                    if (cluster == null) {

Review Comment:
   This unconditionally calls `getClusterThatCanDeleteNamespace()`, which 
returns `null` when `replication_clusters.size() \!= 1` — including when it's 
**0**.
   
   Namespaces with zero replication clusters (common in single-cluster 
deployments) will fail to delete with the misleading error: *"more than one 
replication clusters configured"* when there are actually zero.
   
   Previously, the `isGlobal()` + `isV2()` guards allowed non-global/V1 
namespaces to skip this check. Now with `isGlobal()` always true, all 
namespaces hit this path.
   
   The same issue exists in `precheckWhenDeleteNamespace()` above.
   
   Fix:
   ```java
   String cluster = policies.getClusterThatCanDeleteNamespace();
   if (cluster == null) {
       if (policies.replication_clusters.isEmpty() && 
policies.allowed_clusters.isEmpty()) {
           // No replication configured, allow local deletion
       } else {
           throw new RestException(Status.PRECONDITION_FAILED, ...);
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to