mattisonchao commented on code in PR #15518:
URL: https://github.com/apache/pulsar/pull/15518#discussion_r869232194
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -187,7 +208,17 @@ public void createNamespace(@PathParam("property") String
property, @PathParam("
policies.bundles = getBundles(defaultNumberOfBundles);
}
- internalCreateNamespace(policies);
+ internalCreateNamespace(policies)
+ .thenAccept(__ ->
response.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable root = FutureUtil.unwrapCompletionException(ex);
Review Comment:
Should we add an error log here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -139,11 +155,23 @@ public Policies getPolicies(@PathParam("tenant") String
tenant, @PathParam("name
@ApiResponse(code = 404, message = "Tenant or cluster doesn't
exist"),
@ApiResponse(code = 409, message = "Namespace already exists"),
@ApiResponse(code = 412, message = "Namespace name is not valid")
})
- public void createNamespace(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @ApiParam(value = "Policies for the namespace") Policies policies)
{
+ public void createNamespace(@Suspended AsyncResponse response,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Policies for the
namespace") Policies policies) {
validateNamespaceName(tenant, namespace);
policies = getDefaultPolicesIfNull(policies);
- internalCreateNamespace(policies);
+ internalCreateNamespace(policies)
+ .thenAccept(__ ->
response.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable root = FutureUtil.unwrapCompletionException(ex);
Review Comment:
Should we add an error log here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java:
##########
@@ -187,7 +208,17 @@ public void createNamespace(@PathParam("property") String
property, @PathParam("
policies.bundles = getBundles(defaultNumberOfBundles);
}
- internalCreateNamespace(policies);
+ internalCreateNamespace(policies)
Review Comment:
line 196 `validateClusterForTenant(namespaceName.getTenant(),
namespaceName.getCluster())` looks like a sync method that can be async.
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java:
##########
@@ -110,6 +110,40 @@ public List<String> getListOfNamespaces(String tenant)
throws MetadataStoreExcep
return namespaces;
}
+ public CompletableFuture<List<String>> getListOfNamespacesAsync(String
tenant) {
+ List<String> namespaces = new ArrayList<>();
+ // this will return a cluster in v1 and a namespace in v2
+ CompletableFuture<List<CompletableFuture<Void>>> result =
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
+ .thenApply(clusterOrNamespaces ->
clusterOrNamespaces.stream().map(key ->
+ getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant,
key))
+ .thenCompose(children -> {
+ CompletableFuture<Void> ret =
CompletableFuture.completedFuture(null);
+ if (children == null ||
children.isEmpty()) {
+ String namespace =
NamespaceName.get(tenant, key).toString();
+ // if the length is 0 then this is
probably a leftover cluster from namespace
+ // created with the v1 admin format
(prop/cluster/ns) and then deleted, so no
+ // need to add it to the list
+ ret = ret.thenCompose(__ ->
getAsync(joinPath(BASE_POLICIES_PATH, namespace))
+ .thenAccept(opt ->
opt.map(k -> namespaces.add(namespace)))
+ .exceptionally(ex -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ if (cause
instanceof MetadataStoreException
+
.ContentDeserializationException) {
+ return null;
+ }
+ throw
FutureUtil.wrapToCompletionException(ex);
+ }));
+ } else {
+ children.forEach(ns -> {
Review Comment:
Maybe we can use `Collectors.toList()` to avoid the below code and improve
readability.
`List<String> namespaces = new ArrayList<>();`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##########
@@ -323,6 +323,11 @@ protected CompletableFuture<Policies>
getNamespacePoliciesAsync(NamespaceName na
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData :
policies.get().bundles;
+ if (policies.get().is_allow_auto_update_schema == null) {
Review Comment:
Question:
Why add this part of the code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]