mattisonchao commented on code in PR #15685:
URL: https://github.com/apache/pulsar/pull/15685#discussion_r879978967
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -742,38 +735,37 @@ public void deleteNamespaceIsolationPolicy(
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setFailureDomain(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The failure domain name",
- required = true
- )
+ @ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName,
- @ApiParam(
- value = "The configuration data of a failure domain",
- required = true
- )
- FailureDomainImpl domain
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
- validateBrokerExistsInOtherDomain(cluster, domainName, domain);
-
- try {
- clusterResources().getFailureDomainResources()
- .setFailureDomainWithCreate(cluster, domainName, old ->
domain);
- } catch (NotFoundException nne) {
- log.warn("[{}] Failed to update domain {}. clusters {} Does not
exist", clientAppId(), cluster,
- domainName);
- throw new RestException(Status.NOT_FOUND,
- "Domain " + domainName + " for cluster " + cluster + "
does not exist");
- } catch (Exception e) {
- log.error("[{}] Failed to update clusters/{}/domainName/{}",
clientAppId(), cluster, domainName, e);
- throw new RestException(e);
- }
+ @ApiParam(value = "The configuration data of a failure domain",
required = true) FailureDomainImpl domain
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
+ .thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster,
domainName, domain))
+ .thenCompose(__ ->
clusterResources().getFailureDomainResources()
+ .setFailureDomainWithCreateAsync(cluster, domainName,
old -> domain))
+ .thenAccept(__ -> {
+ log.info("[{}] Successful set failure domain {} for
cluster {}",
+ clientAppId(), domainName, cluster);
+ asyncResponse.resume(Response.ok().build());
Review Comment:
Fixed.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java:
##########
@@ -686,46 +687,38 @@ private CompletableFuture<Void>
filterAndUnloadMatchedNamespaceAsync(NamespaceIs
@ApiResponse(code = 500, message = "Internal server error.")
})
public void deleteNamespaceIsolationPolicy(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The namespace isolation policy name",
- required = true
- )
+ @ApiParam(value = "The namespace isolation policy name", required =
true)
@PathParam("policyName") String policyName
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
- validatePoliciesReadOnlyAccess();
-
- try {
-
- NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPolicies()
- .getIsolationDataPolicies(cluster).orElseGet(() -> {
- try {
-
namespaceIsolationPolicies().setIsolationDataWithCreate(cluster,
- (p) -> Collections.emptyMap());
- return new NamespaceIsolationPolicies();
- } catch (Exception e) {
- throw new RestException(e);
- }
- });
-
- nsIsolationPolicies.deletePolicy(policyName);
- namespaceIsolationPolicies().setIsolationData(cluster, old ->
nsIsolationPolicies.getPolicies());
- } catch (NotFoundException nne) {
- log.warn("[{}] Failed to update
brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
- cluster);
- throw new RestException(Status.NOT_FOUND,
- "NamespaceIsolationPolicies for cluster " + cluster + "
does not exist");
- } catch (Exception e) {
- log.error("[{}] Failed to update
brokers/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
- policyName, e);
- throw new RestException(e);
- }
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ ->
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster))
+ .thenCompose(nsIsolationPoliciesOpt ->
nsIsolationPoliciesOpt.map(CompletableFuture::completedFuture)
+ .orElseGet(() -> namespaceIsolationPolicies()
+ .setIsolationDataWithCreateAsync(cluster, (p)
-> Collections.emptyMap())
+ .thenApply(__ -> new
NamespaceIsolationPolicies())))
+ .thenCompose(policies -> {
+ policies.deletePolicy(policyName);
+ return
namespaceIsolationPolicies().setIsolationDataAsync(cluster, old ->
policies.getPolicies());
+ }).thenAccept(__ ->
asyncResponse.resume(Response.ok().build()))
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]