This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 183340776f3 [improve][broker] Make some methods of `BrokersBase` pure
async. (#15280)
183340776f3 is described below
commit 183340776f3a5248ec384ea6f94fff7c3f38bc03
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Apr 27 17:10:40 2022 +0800
[improve][broker] Make some methods of `BrokersBase` pure async. (#15280)
---
.../broker/resources/NamespaceResources.java | 6 +++
.../pulsar/broker/admin/impl/BrokersBase.java | 62 ++++++++++++----------
.../pulsar/broker/namespace/NamespaceService.java | 35 ++++++------
.../pulsar/broker/namespace/OwnershipCache.java | 4 ++
4 files changed, 62 insertions(+), 45 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 3c35f32f36c..54122bbcf45 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -183,6 +183,12 @@ public class NamespaceResources extends
BaseResources<Policies> {
return data.isPresent() ? Optional.of(new
NamespaceIsolationPolicies(data.get())) : Optional.empty();
}
+ public CompletableFuture<NamespaceIsolationPolicies>
getIsolationDataPoliciesAsync(String cluster) {
+ return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster,
NAMESPACE_ISOLATION_POLICIES))
+ .thenApply(data ->
data.map(NamespaceIsolationPolicies::new)
+ .orElseGet(NamespaceIsolationPolicies::new));
+ }
+
public void deleteIsolationData(String cluster) throws
MetadataStoreException {
delete(joinPath(BASE_CLUSTERS_PATH, cluster,
NAMESPACE_ISOLATION_POLICIES));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 8e9167a886c..7f6740e3ed2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -147,20 +147,23 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the cluster"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist") })
- public Map<String, NamespaceOwnershipStatus>
getOwnedNamespaces(@PathParam("clusterName") String cluster,
- @PathParam("broker-webserviceurl") String broker) throws Exception
{
- validateSuperUserAccess();
- validateClusterOwnership(cluster);
- validateBrokerName(broker);
-
- try {
- // now we validated that this is the broker specified in the
request
- return pulsar().getNamespaceService().getOwnedNameSpacesStatus();
- } catch (Exception e) {
- LOG.error("[{}] Failed to get the namespace ownership status.
cluster={}, broker={}", clientAppId(),
- cluster, broker);
- throw new RestException(e);
- }
+ public void getOwnedNamespaces(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("clusterName") String cluster,
+ @PathParam("broker-webserviceurl") String
broker) {
+ validateSuperUserAccessAsync()
+ .thenAccept(__ -> validateBrokerName(broker))
+ .thenCompose(__ -> validateClusterOwnershipAsync(cluster))
+ .thenCompose(__ ->
pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ // If the exception is not redirect exception we need to
log it.
+ if (!isRedirectException(ex)) {
+ LOG.error("[{}] Failed to get the namespace ownership
status. cluster={}, broker={}",
+ clientAppId(), cluster, broker);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -219,17 +222,15 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 403, message = "You don't have admin permission to
view configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 500, message = "Internal server error")})
- public Map<String, String> getAllDynamicConfigurations() throws Exception {
- validateSuperUserAccess();
- try {
- return
dynamicConfigurationResources().getDynamicConfiguration().orElseGet(Collections::emptyMap);
- } catch (RestException e) {
- LOG.error("[{}] couldn't find any configuration in zk {}",
clientAppId(), e.getMessage(), e);
- throw e;
- } catch (Exception e) {
- LOG.error("[{}] Failed to retrieve configuration from zk {}",
clientAppId(), e.getMessage(), e);
- throw new RestException(e);
- }
+ public void getAllDynamicConfigurations(@Suspended AsyncResponse
asyncResponse) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ ->
dynamicConfigurationResources().getDynamicConfigurationAsync())
+ .thenAccept(configOpt ->
asyncResponse.resume(configOpt.orElseGet(Collections::emptyMap)))
+ .exceptionally(ex -> {
+ LOG.error("[{}] Failed to get all dynamic configuration.",
clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -237,9 +238,14 @@ public class BrokersBase extends AdminResource {
@ApiOperation(value = "Get all updatable dynamic configurations's name")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "You don't have admin
permission to get configuration")})
- public List<String> getDynamicConfigurationName() {
- validateSuperUserAccess();
- return BrokerService.getDynamicConfiguration();
+ public void getDynamicConfigurationName(@Suspended AsyncResponse
asyncResponse) {
+ validateSuperUserAccessAsync()
+ .thenAccept(__ ->
asyncResponse.resume(BrokerService.getDynamicConfiguration()))
+ .exceptionally(ex -> {
+ LOG.error("[{}] Failed to get all dynamic configuration
names.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e1e1dc10676..fdfd9bf1880 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -29,8 +29,8 @@ import com.google.common.hash.Hashing;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -732,16 +732,21 @@ public class NamespaceService implements AutoCloseable {
return
pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
}
- public Map<String, NamespaceOwnershipStatus> getOwnedNameSpacesStatus()
throws Exception {
- NamespaceIsolationPolicies nsIsolationPolicies =
this.getLocalNamespaceIsolationPolicies();
- Map<String, NamespaceOwnershipStatus> ownedNsStatus = new
HashMap<String, NamespaceOwnershipStatus>();
- for (OwnedBundle nsObj :
this.ownershipCache.getOwnedBundles().values()) {
- NamespaceOwnershipStatus nsStatus =
this.getNamespaceOwnershipStatus(nsObj,
-
nsIsolationPolicies.getPolicyByNamespace(nsObj.getNamespaceBundle().getNamespaceObject()));
- ownedNsStatus.put(nsObj.getNamespaceBundle().toString(), nsStatus);
- }
-
- return ownedNsStatus;
+ public CompletableFuture<Map<String, NamespaceOwnershipStatus>>
getOwnedNameSpacesStatusAsync() {
+ return getLocalNamespaceIsolationPoliciesAsync()
+ .thenCompose(namespaceIsolationPolicies -> {
+ Collection<CompletableFuture<OwnedBundle>> futures =
+ ownershipCache.getOwnedBundlesAsync().values();
+ return FutureUtil.waitForAll(futures)
+ .thenApply(__ -> futures.stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toMap(bundle ->
bundle.getNamespaceBundle().toString(),
+ bundle ->
getNamespaceOwnershipStatus(bundle,
+
namespaceIsolationPolicies.getPolicyByNamespace(
+
bundle.getNamespaceBundle().getNamespaceObject()))
+ ))
+ );
+ });
}
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle
nsObj,
@@ -763,14 +768,10 @@ public class NamespaceService implements AutoCloseable {
return nsOwnedStatus;
}
- private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies()
throws Exception {
+ private CompletableFuture<NamespaceIsolationPolicies>
getLocalNamespaceIsolationPoliciesAsync() {
String localCluster = pulsar.getConfiguration().getClusterName();
return
pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
- .getIsolationDataPolicies(localCluster)
- .orElseGet(() -> {
- // the namespace isolation policies are empty/undefined =
an empty object
- return new NamespaceIsolationPolicies();
- });
+ .getIsolationDataPoliciesAsync(localCluster);
}
public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws
Exception {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 3d8bf2654c6..67e986b804c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -250,6 +250,10 @@ public class OwnershipCache {
return this.ownedBundlesCache.synchronous().asMap();
}
+ public Map<NamespaceBundle, CompletableFuture<OwnedBundle>>
getOwnedBundlesAsync() {
+ return ownedBundlesCache.asMap();
+ }
+
/**
* Checked whether a particular bundle is currently owned by this broker.
*