oneby-wang commented on code in PR #25086:
URL: https://github.com/apache/pulsar/pull/25086#discussion_r2626608923


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1842,104 +1811,69 @@ protected void 
internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliver
         internalSetPolicies("delayed_delivery_policies", 
delayedDeliveryPolicies);
     }
 
-    protected void internalSetNamespaceAntiAffinityGroup(String 
antiAffinityGroup) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
-        checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be 
null");
-        validatePoliciesReadOnlyAccess();
-
-        log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), 
antiAffinityGroup, namespaceName);
-
-        if (isBlank(antiAffinityGroup)) {
-            throw new RestException(Status.PRECONDITION_FAILED, 
"antiAffinityGroup can't be empty");
-        }
-
-        try {
-            getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
-                lp.map(policies -> new LocalPolicies(policies.bundles,
-                        policies.bookieAffinityGroup,
-                        antiAffinityGroup,
-                        policies.migrated))
-                        .orElseGet(() -> new 
LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup))
-            );
-            log.info("[{}] Successfully updated local-policies configuration: 
namespace={}, map={}", clientAppId(),
-                    namespaceName, antiAffinityGroup);
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to update local-policy configuration for 
namespace {}", clientAppId(), namespaceName,
-                    e);
-            throw new RestException(e);
-        }
-    }
-
-    protected String internalGetNamespaceAntiAffinityGroup() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
-
-        try {
-            return getLocalPolicies()
-                    .getLocalPolicies(namespaceName)
-                    .orElseGet(() -> new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles())
-                            , null, null)).namespaceAntiAffinityGroup;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get the antiAffinityGroup of namespace 
{}", clientAppId(), namespaceName, e);
-            throw new RestException(Status.NOT_FOUND, "Couldn't find namespace 
policies");
-        }
-    }
-
-    protected void internalRemoveNamespaceAntiAffinityGroup() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-
-        log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), 
namespaceName);
-
-        try {
-            getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
-                new LocalPolicies(policies.bundles,
-                        policies.bookieAffinityGroup,
-                        null,
-                        policies.migrated));
-            log.info("[{}] Successfully removed anti-affinity group for a 
namespace={}", clientAppId(), namespaceName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to remove anti-affinity group for namespace 
{}", clientAppId(), namespaceName, e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> 
internalSetNamespaceAntiAffinityGroupAsync(String antiAffinityGroup) {
+        checkNotNull(antiAffinityGroup, "Anti-affinity group should not be 
null");
+        checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
+                .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync()).thenCompose(
+                        __ -> getDefaultBundleDataAsync().thenCompose(
+                                defaultBundleData -> 
getLocalPolicies().setLocalPoliciesWithCreateAsync(namespaceName,
+                                        oldPolicies -> 
oldPolicies.map(policies -> new LocalPolicies(policies.bundles,
+                                                        
policies.bookieAffinityGroup, antiAffinityGroup,
+                                                        policies.migrated))
+                                                .orElseGet(() -> new 
LocalPolicies(defaultBundleData, null,
+                                                        antiAffinityGroup)))))
+                .thenAccept(__ -> log.info(
+                        "[{}] Successfully updated namespace anti-affinity 
group, namespace={}, anti-affinity"
+                                + " group={}", clientAppId(), namespaceName, 
antiAffinityGroup));
+    }
+
+    protected CompletableFuture<String> 
internalGetNamespaceAntiAffinityGroupAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
+                .thenCompose(__ -> 
getLocalPolicies().getLocalPoliciesAsync(namespaceName)
+                .thenApply(policiesOpt -> policiesOpt.map(localPolicies -> 
localPolicies.namespaceAntiAffinityGroup)
+                        .orElse(null)));
+    }
+
+    protected CompletableFuture<Void> 
internalRemoveNamespaceAntiAffinityGroupAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> {
+                    log.info("[{}] Removing anti-affinity group for namespace: 
{}", clientAppId(), namespaceName);
+                    return 
getLocalPolicies().setLocalPoliciesAsync(namespaceName,
+                            (policies) -> new LocalPolicies(policies.bundles, 
policies.bookieAffinityGroup, null,
+                                    policies.migrated));
+                })
+                .thenAccept(__ -> log.info("[{}] Successfully removed 
anti-affinity group for namespace: {}",
+                        clientAppId(), namespaceName));
     }
 
-    protected List<String> internalGetAntiAffinityNamespaces(String cluster, 
String antiAffinityGroup,
-                                                             String tenant) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
+    protected CompletableFuture<List<String>> 
internalGetAntiAffinityNamespacesAsync(String cluster,
+                                                                               
      String antiAffinityGroup,
+                                                                               
      String tenant) {
         checkNotNull(cluster, "Cluster should not be null");
-        checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be 
null");
+        checkNotNull(antiAffinityGroup, "Anti-affinity group should not be 
null");
         checkNotNull(tenant, "Tenant should not be null");
+        checkNotBlank(antiAffinityGroup, "Anti-affinity group can't be empty");
 
-        log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), 
tenant, antiAffinityGroup, cluster);
-
-        if (isBlank(antiAffinityGroup)) {
-            throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity 
group can't be empty.");
-        }
-        validateClusterExists(cluster);
-
-        try {
-            List<String> namespaces = 
tenantResources().getListOfNamespaces(tenant);
-
-            return namespaces.stream().filter(ns -> {
-                Optional<LocalPolicies> policies;
-                try {
-                    policies = 
getLocalPolicies().getLocalPolicies(NamespaceName.get(ns));
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-
-                String storedAntiAffinityGroup = policies.orElseGet(() ->
-                        new 
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
-                                null, null)).namespaceAntiAffinityGroup;
-                return 
antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup);
-            }).collect(Collectors.toList());
-
-        } catch (Exception e) {
-            log.warn("Failed to list of properties/namespace from global-zk", 
e);
-            throw new RestException(e);
-        }
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ANTI_AFFINITY, PolicyOperation.READ)
+                .thenCompose(__ -> validateClusterExistsAsync(cluster))
+                .thenCompose(__ -> {
+                    log.info("[{}]-{} Finding namespaces for {} in {}", 
clientAppId(), tenant, antiAffinityGroup,
+                            cluster);
+                    return 
tenantResources().getListOfNamespacesAsync(tenant).thenCompose(namespaces -> {
+                        List<CompletableFuture<String>> nsFutures = 
namespaces.stream()
+                                .map(ns -> 
getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(ns))
+                                        .thenApply(policiesOpt -> 
policiesOpt.map(
+                                                localPolicies -> 
localPolicies.namespaceAntiAffinityGroup).orElse(null))
+                                        
.thenApply(antiAffinityGroup::equalsIgnoreCase)

Review Comment:
   `String#equalsIgnoreCase()` source code:
   
   ```java
     public boolean equalsIgnoreCase(String anotherString) {
         return (this == anotherString) ? true
                 : (anotherString != null)
                 && (anotherString.length() == length())
                 && regionMatches(true, 0, anotherString, 0, length());
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to