mattisonchao commented on code in PR #15518:
URL: https://github.com/apache/pulsar/pull/15518#discussion_r872030642


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java:
##########
@@ -110,6 +111,41 @@ public List<String> getListOfNamespaces(String tenant) 
throws MetadataStoreExcep
         return namespaces;
     }
 
+    public CompletableFuture<List<String>> getListOfNamespacesAsync(String 
tenant) {
+        // this will return a cluster in v1 and a namespace in v2
+        return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
+                .thenCompose(clusterOrNamespaces -> 
clusterOrNamespaces.stream().map(key ->
+                        getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, 
key))
+                                .thenCompose(children -> {
+                                    if (children == null || 
children.isEmpty()) {
+                                        String namespace = 
NamespaceName.get(tenant, key).toString();
+                                        // if the length is 0 then this is 
probably a leftover cluster from namespace
+                                        // created with the v1 admin format 
(prop/cluster/ns) and then deleted, so no
+                                        // need to add it to the list
+                                        return 
getAsync(joinPath(BASE_POLICIES_PATH, namespace))
+                                           .thenApply(opt -> opt.isPresent() ? 
Collections.singletonList(namespace)
+                                                   : new ArrayList<String>())
+                                           .exceptionally(ex -> {
+                                                Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                                                if (cause instanceof 
MetadataStoreException
+                                                        
.ContentDeserializationException) {
+                                                    return new ArrayList<>();
+                                                }
+                                                throw 
FutureUtil.wrapToCompletionException(ex);
+                                            });
+                                    } else {
+                                        CompletableFuture<List<String>> ret = 
new CompletableFuture();
+                                        ret.complete(children.stream().map(ns 
-> NamespaceName.get(tenant, key, ns)
+                                                
.toString()).collect(Collectors.toList()));
+                                        return ret;
+                                    }
+                                
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
+                                        (accumulator, n) -> 
accumulator.thenCompose(namespaces -> n.thenCompose(m -> {
+                                            namespaces.addAll(m);
+                                            return 
CompletableFuture.completedFuture(namespaces);

Review Comment:
   ```suggestion
                                               return namespaces;
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java:
##########
@@ -110,6 +111,41 @@ public List<String> getListOfNamespaces(String tenant) 
throws MetadataStoreExcep
         return namespaces;
     }
 
+    public CompletableFuture<List<String>> getListOfNamespacesAsync(String 
tenant) {
+        // this will return a cluster in v1 and a namespace in v2
+        return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
+                .thenCompose(clusterOrNamespaces -> 
clusterOrNamespaces.stream().map(key ->
+                        getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, 
key))
+                                .thenCompose(children -> {
+                                    if (children == null || 
children.isEmpty()) {
+                                        String namespace = 
NamespaceName.get(tenant, key).toString();
+                                        // if the length is 0 then this is 
probably a leftover cluster from namespace
+                                        // created with the v1 admin format 
(prop/cluster/ns) and then deleted, so no
+                                        // need to add it to the list
+                                        return 
getAsync(joinPath(BASE_POLICIES_PATH, namespace))
+                                           .thenApply(opt -> opt.isPresent() ? 
Collections.singletonList(namespace)
+                                                   : new ArrayList<String>())
+                                           .exceptionally(ex -> {
+                                                Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                                                if (cause instanceof 
MetadataStoreException
+                                                        
.ContentDeserializationException) {
+                                                    return new ArrayList<>();
+                                                }
+                                                throw 
FutureUtil.wrapToCompletionException(ex);
+                                            });
+                                    } else {
+                                        CompletableFuture<List<String>> ret = 
new CompletableFuture();
+                                        ret.complete(children.stream().map(ns 
-> NamespaceName.get(tenant, key, ns)
+                                                
.toString()).collect(Collectors.toList()));
+                                        return ret;
+                                    }
+                                
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
+                                        (accumulator, n) -> 
accumulator.thenCompose(namespaces -> n.thenCompose(m -> {

Review Comment:
   ```suggestion
                                           (accumulator, n) -> 
accumulator.thenApply(namespaces -> n.thenCompose(m -> {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -103,60 +103,55 @@
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class NamespacesBase extends AdminResource {
 
-    protected List<String> internalGetTenantNamespaces(String tenant) {
-        checkNotNull(tenant, "Tenant should not be null");
+    protected CompletableFuture<List<String>> 
internalGetTenantNamespaces(String tenant) {
+        if (tenant == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Tenant should not be null"));
+        }
         try {
             NamedEntity.checkName(tenant);
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, 
e);
-            throw new RestException(Status.PRECONDITION_FAILED, "Tenant name 
is not valid");
-        }
-        validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
-
-        try {
-            if (!tenantResources().tenantExists(tenant)) {
-                throw new RestException(Status.NOT_FOUND, "Tenant not found");
-            }
-
-            return tenantResources().getListOfNamespaces(tenant);
-        } catch (Exception e) {
-            log.error("[{}] Failed to get namespaces list: {}", clientAppId(), 
e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
         }
+        return validateTenantOperationAsync(tenant, 
TenantOperation.LIST_NAMESPACES)
+                .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
+                .thenCompose(existed -> {
+                    if (!existed) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant not 
found");
+                    }
+                    return tenantResources().getListOfNamespacesAsync(tenant);
+                });
     }
 
-    protected void internalCreateNamespace(Policies policies) {
-        validateTenantOperation(namespaceName.getTenant(), 
TenantOperation.CREATE_NAMESPACE);
-        validatePoliciesReadOnlyAccess();
-        validatePolicies(namespaceName, policies);
-
-        try {
-            int maxNamespacesPerTenant = 
pulsar().getConfiguration().getMaxNamespacesPerTenant();
-            // no distributed locks are added here.In a concurrent scenario, 
the threshold will be exceeded.
-            if (maxNamespacesPerTenant > 0) {
-                List<String> namespaces = 
tenantResources().getListOfNamespaces(namespaceName.getTenant());
-                if (namespaces != null && namespaces.size() > 
maxNamespacesPerTenant) {
-                    throw new RestException(Status.PRECONDITION_FAILED,
-                            "Exceed the maximum number of namespace in tenant 
:" + namespaceName.getTenant());
-                }
-            }
-            namespaceResources().createPolicies(namespaceName, policies);
-            log.info("[{}] Created namespace {}", clientAppId(), 
namespaceName);
-        } catch (AlreadyExistsException e) {
-            log.warn("[{}] Failed to create namespace {} - already exists", 
clientAppId(), namespaceName);
-            throw new RestException(Status.CONFLICT, "Namespace already 
exists");
-        } catch (Exception e) {
-            log.error("[{}] Failed to create namespace {}", clientAppId(), 
namespaceName, e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> internalCreateNamespace(Policies 
policies) {
+        return validateTenantOperationAsync(namespaceName.getTenant(), 
TenantOperation.CREATE_NAMESPACE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> validatePolicies(namespaceName, policies))
+                .thenCompose(__ -> {
+                    CompletableFuture<Void> ret = 
CompletableFuture.completedFuture(null);
+                    int maxNamespacesPerTenant = 
pulsar().getConfiguration().getMaxNamespacesPerTenant();
+                    // no distributed locks are added here.In a concurrent 
scenario, the threshold will be exceeded.
+                    if (maxNamespacesPerTenant > 0) {
+                        ret = 
tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())

Review Comment:
   Directly return future to avoid creating one more future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to