codelipenghui commented on code in PR #25304:
URL: https://github.com/apache/pulsar/pull/25304#discussion_r2920544503
##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java:
##########
@@ -133,40 +126,20 @@ private TopicName(String completeTopicName) {
}
}
- // The fully qualified topic name can be in two different forms:
- // new: persistent://tenant/namespace/topic
- // legacy: persistent://tenant/cluster/namespace/topic
-
+ // Expected format: persistent://tenant/namespace/topic
List<String> parts =
Splitter.on("://").limit(2).splitToList(completeTopicName);
this.domain = TopicDomain.getEnum(parts.get(0));
String rest = parts.get(1);
- // The rest of the name can be in different forms:
- // new: tenant/namespace/<localName>
- // legacy: tenant/cluster/namespace/<localName>
- // Examples of localName:
- // 1. some, name, xyz
- // 2. xyz-123, feeder-2
-
-
- parts = Splitter.on("/").limit(4).splitToList(rest);
+ // Expected format: tenant/namespace/<localName>
Review Comment:
`limit(3)` silently reinterprets V1 topic names instead of rejecting them.
For `persistent://tenant/cluster/namespace/topic`, this splits into
`["tenant", "cluster", "namespace/topic"]` and is accepted as a valid V2 topic
with tenant="tenant", namespace="cluster", localName="namespace/topic".
Users with old V1 topic names would **silently produce/consume on the wrong
namespace**. Unlike `NamespaceName` which rejects 3-part names with a clear
error, `TopicName` silently accepts.
Suggestion: keep `limit(4)`, detect `parts.size() == 4`, and throw:
```java
} else if (parts.size() == 4) {
throw new IllegalArgumentException(
"V1 topic names (with cluster component) are no longer supported. "
+ "Please use the V2 format: '<domain>://tenant/namespace/topic'.
Got: " + completeTopicName);
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -562,63 +549,51 @@ protected CompletableFuture<Void>
internalDeleteNamespaceBundleAsync(String bund
clientAppId(), namespaceName, bundleRange, authoritative,
force);
return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.DELETE_BUNDLE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
- .thenCompose(__ -> {
- if (!namespaceName.isGlobal()) {
- return
validateClusterOwnershipAsync(namespaceName.getCluster());
- }
- return CompletableFuture.completedFuture(null);
- })
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> {
CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
- if (namespaceName.isGlobal()) {
- // Just keep the behavior of V1 namespace being the
same as before.
- if (!namespaceName.isV2() &&
policies.replication_clusters.isEmpty()
- && policies.allowed_clusters.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
- String cluster =
policies.getClusterThatCanDeleteNamespace();
- if (cluster == null) {
- // There are still more than one clusters
configured for the global namespace
- throw new
RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
- + namespaceName
- + ". There are still more than one
replication clusters configured.");
- }
- if (!cluster.equals(config().getClusterName())) { //
No need to change.
- // the only replication cluster is other cluster,
redirect
- future =
clusterResources().getClusterAsync(cluster)
- .thenCompose(clusterData -> {
- if (clusterData.isEmpty()) {
- throw new
RestException(Status.NOT_FOUND,
- "Cluster " + cluster + "
does not exist");
- }
- ClusterData replClusterData =
clusterData.get();
- URL replClusterUrl;
- try {
- if (!config().isTlsEnabled() ||
!isRequestHttps()) {
- replClusterUrl = new
URL(replClusterData.getServiceUrl());
- } else if
(StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
- replClusterUrl = new
URL(replClusterData.getServiceUrlTls());
- } else {
- throw new
RestException(Status.PRECONDITION_FAILED,
- "The replication
cluster does not provide TLS encrypted "
- + "service");
- }
- } catch (MalformedURLException
malformedURLException) {
- throw new
RestException(malformedURLException);
+ String cluster =
policies.getClusterThatCanDeleteNamespace();
+ if (cluster == null) {
Review Comment:
This unconditionally calls `getClusterThatCanDeleteNamespace()`, which
returns `null` when `replication_clusters.size() \!= 1` — including when it's
**0**.
Namespaces with zero replication clusters (common in single-cluster
deployments) will fail to delete with the misleading error: *"more than one
replication clusters configured"* when there are actually zero.
Previously, the `isGlobal()` + `isV2()` guards allowed non-global/V1
namespaces to skip this check. Now with `isGlobal()` always true, all
namespaces hit this path.
The same issue exists in `precheckWhenDeleteNamespace()` above.
Fix:
```java
String cluster = policies.getClusterThatCanDeleteNamespace();
if (cluster == null) {
if (policies.replication_clusters.isEmpty() &&
policies.allowed_clusters.isEmpty()) {
// No replication configured, allow local deletion
} else {
throw new RestException(Status.PRECONDITION_FAILED, ...);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]