shibd commented on code in PR #16313:
URL: https://github.com/apache/pulsar/pull/16313#discussion_r912379600
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1074,76 +1105,50 @@ protected BookieAffinityGroupData
internalGetBookieAffinityGroup() {
}
}
- @SuppressWarnings("deprecation")
- public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse,
String bundleRange, boolean authoritative) {
- validateSuperUserAccess();
- checkNotNull(bundleRange, "BundleRange should not be null");
- log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- NamespaceBundle bundle =
- pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundle(namespaceName.toString(), bundleRange);
- boolean isOwnedByLocalCluster = false;
- try {
- isOwnedByLocalCluster =
pulsar().getNamespaceService().isNamespaceBundleOwned(bundle).get();
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("Failed to validate cluster ownership for {}-{}, {}",
- namespaceName.toString(), bundleRange, e.getMessage(),
e);
- }
- }
-
- // validate namespace ownership only if namespace is not owned by
local-cluster (it happens when broker doesn't
- // receive replication-cluster change watch and still owning bundle
- if (!isOwnedByLocalCluster) {
- if (namespaceName.isGlobal()) {
- // check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
- } else {
- validateClusterOwnership(namespaceName.getCluster());
- validateClusterForTenant(namespaceName.getTenant(),
namespaceName.getCluster());
- }
- }
-
- validatePoliciesReadOnlyAccess();
-
- isBundleOwnedByAnyBroker(namespaceName, policies.bundles,
bundleRange).thenAccept(flag -> {
- if (!flag) {
- log.info("[{}] Namespace bundle is not owned by any broker
{}/{}", clientAppId(), namespaceName,
- bundleRange);
- asyncResponse.resume(Response.noContent().build());
- return;
- }
- NamespaceBundle nsBundle;
-
- try {
- nsBundle = validateNamespaceBundleOwnership(namespaceName,
policies.bundles, bundleRange,
- authoritative, true);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- return;
- }
-
- pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded namespace bundle
{}", clientAppId(), nsBundle.toString());
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange,
- ex);
- asyncResponse.resume(new RestException(ex));
- return null;
- });
- }).exceptionally((ex) -> {
- if (ex.getCause() instanceof WebApplicationException) {
- asyncResponse.resume(ex.getCause());
- } else {
- asyncResponse.resume(new RestException(ex.getCause()));
- }
- return null;
- });
+ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String
bundleRange, boolean authoritative) {
+ return validateSuperUserAccessAsync()
+ .thenAccept(__ -> {
+ checkNotNull(bundleRange, "BundleRange should not be
null");
+ log.info("[{}] Unloading namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange);
+ })
+ .thenApply(__ ->
+ pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespaceName.toString(),
bundleRange)
+ )
+ .thenCompose(bundle ->
pulsar().getNamespaceService().isNamespaceBundleOwned(bundle))
Review Comment:
Need to keep the same logic as the original? original logic: when throw
exception, isOwnedByLocalCluster is false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]