This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d4be6bb0520 [improve][broker] Make some methods of `ClusterBase` pure
async. (#15847)
d4be6bb0520 is described below
commit d4be6bb05204c270122e159c5c361cd7a2412805
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Jun 1 18:37:14 2022 +0800
[improve][broker] Make some methods of `ClusterBase` pure async. (#15847)
---
.../pulsar/broker/resources/ClusterResources.java | 5 ++
.../pulsar/broker/admin/impl/ClustersBase.java | 84 +++++++++++-----------
2 files changed, 45 insertions(+), 44 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 91639578d26..5bde79e6321 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -162,6 +162,11 @@ public class ClusterResources extends
BaseResources<ClusterData> {
delete(path);
}
+ public CompletableFuture<Void> deleteFailureDomainAsync(String
clusterName, String domainName) {
+ String path = joinPath(BASE_CLUSTERS_PATH, clusterName,
FAILURE_DOMAIN, domainName);
+ return deleteAsync(path);
+ }
+
public CompletableFuture<Void> deleteFailureDomainsAsync(String
clusterName) {
String failureDomainPath = joinPath(BASE_CLUSTERS_PATH,
clusterName, FAILURE_DOMAIN);
return existsAsync(failureDomainPath)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 034d7b5f9fe..7797f5dcaac 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -834,31 +834,26 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 412, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
- public FailureDomainImpl getDomain(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ public void getDomain(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The failure domain name",
- required = true
- )
+ @ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
-
- try {
- return
clusterResources().getFailureDomainResources().getFailureDomain(cluster,
domainName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
+ .thenCompose(__ ->
clusterResources().getFailureDomainResources()
+ .getFailureDomainAsync(cluster, domainName))
+ .thenAccept(domain -> {
+ FailureDomainImpl failureDomain = domain.orElseThrow(() ->
new RestException(Status.NOT_FOUND,
"Domain " + domainName + " for cluster " + cluster
+ " does not exist"));
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to get domain {} for cluster {}",
clientAppId(), domainName, cluster, e);
- throw new RestException(e);
- }
+ asyncResponse.resume(failureDomain);
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get domain {} for cluster {}",
clientAppId(), domainName, cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
@@ -874,30 +869,31 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public void deleteFailureDomain(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The failure domain name",
- required = true
- )
+ @ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName
- ) throws Exception {
- validateSuperUserAccess();
- validateClusterExists(cluster);
-
- try {
-
clusterResources().getFailureDomainResources().deleteFailureDomain(cluster,
domainName);
- } catch (NotFoundException nne) {
- log.warn("[{}] Domain {} does not exist in {}", clientAppId(),
domainName, cluster);
- throw new RestException(Status.NOT_FOUND,
- "Domain-name " + domainName + " or cluster " + cluster + "
does not exist");
- } catch (Exception e) {
- log.error("[{}] Failed to delete domain {} in cluster {}",
clientAppId(), domainName, cluster, e);
- throw new RestException(e);
- }
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
+ .thenCompose(__ -> clusterResources()
+
.getFailureDomainResources().deleteFailureDomainAsync(cluster, domainName))
+ .thenAccept(__ -> {
+ log.info("[{}] Successful delete domain {} in cluster {}",
clientAppId(), domainName, cluster);
+ asyncResponse.resume(Response.ok().build());
+ }).exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof NotFoundException) {
+ log.warn("[{}] Domain {} does not exist in {}",
clientAppId(), domainName, cluster);
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "Domain-name " + domainName + " or cluster " +
cluster + " does not exist"));
+ return null;
+ }
+ log.error("[{}] Failed to delete domain {} in cluster {}",
clientAppId(), domainName, cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
private CompletableFuture<Void> validateBrokerExistsInOtherDomain(final
String cluster,