lifepuzzlefun commented on code in PR #20666:
URL: https://github.com/apache/pulsar/pull/20666#discussion_r1245295502


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -489,24 +489,27 @@ public CompletableFuture<Optional<String>> 
selectAsync(ServiceUnitId bundle,
 
                     // Filter out brokers that do not meet the rules.
                     List<BrokerFilter> filterPipeline = 
getBrokerFilterPipeline();
+                    ArrayList<CompletableFuture<Map<String, 
BrokerLookupData>>> futures =
+                            new ArrayList<>(filterPipeline.size());
                     for (final BrokerFilter filter : filterPipeline) {
-                        try {
-                            filter.filter(availableBrokerCandidates, bundle, 
context);
-                            // Preserve the filter successes result.
-                            
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
-                        } catch (BrokerFilterException e) {
-                            // TODO: We may need to revisit this error case.
-                            log.error("Failed to filter out brokers.", e);
-                            availableBrokerCandidates = new 
HashMap<>(availableBrokers);
-                        }
-                    }
-                    if (availableBrokerCandidates.isEmpty()) {
-                        return 
CompletableFuture.completedFuture(Optional.empty());
+                        CompletableFuture<Map<String, BrokerLookupData>> 
future =
+                                filter.filter(availableBrokerCandidates, 
bundle, context);
+                        futures.add(future);

Review Comment:
   do we need process order for the filter here. the k8s scheduler will apply 
filter orderly. I wonder if we need it.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -489,24 +489,27 @@ public CompletableFuture<Optional<String>> 
selectAsync(ServiceUnitId bundle,
 
                     // Filter out brokers that do not meet the rules.
                     List<BrokerFilter> filterPipeline = 
getBrokerFilterPipeline();
+                    ArrayList<CompletableFuture<Map<String, 
BrokerLookupData>>> futures =
+                            new ArrayList<>(filterPipeline.size());
                     for (final BrokerFilter filter : filterPipeline) {
-                        try {
-                            filter.filter(availableBrokerCandidates, bundle, 
context);
-                            // Preserve the filter successes result.
-                            
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
-                        } catch (BrokerFilterException e) {
-                            // TODO: We may need to revisit this error case.
-                            log.error("Failed to filter out brokers.", e);
-                            availableBrokerCandidates = new 
HashMap<>(availableBrokers);
-                        }
-                    }
-                    if (availableBrokerCandidates.isEmpty()) {
-                        return 
CompletableFuture.completedFuture(Optional.empty());
+                        CompletableFuture<Map<String, BrokerLookupData>> 
future =
+                                filter.filter(availableBrokerCandidates, 
bundle, context);
+                        futures.add(future);
                     }
-                    Set<String> candidateBrokers = 
availableBrokerCandidates.keySet();
+                    CompletableFuture<Optional<String>> result = new 
CompletableFuture<>();
+                    FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            log.error("Failed to filter out brokers when 
select bundle: {}", bundle, ex);

Review Comment:
   It seems we ignore all the filter result exception here. does this will 
inference the filter logic ?  or we can choose some soft limit filter exception 
can be ignored and some filter are not.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java:
##########
@@ -53,6 +54,20 @@ private Optional<NamespaceIsolationPolicies> 
getIsolationPolicies(String cluster
         }
     }
 
+    private CompletableFuture<Optional<NamespaceIsolationPolicies>> 
getIsolationPoliciesAsync(String clusterName) {
+        return this.pulsar.getPulsarResources().getNamespaceResources()
+                
.getIsolationPolicies().getIsolationDataPoliciesAsync(clusterName);

Review Comment:
   I'm not sure the callback of this CompletableFuture is still execute on the 
metadataStore thread. if yes, if switch callback execute thread to other thread 
is needed ? WDYT?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -181,6 +181,98 @@ public static void applyNamespacePolicies(final 
ServiceUnitId serviceUnit,
         }
     }
 
+    public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
+            final ServiceUnitId serviceUnit, final 
SimpleResourceAllocationPolicies policies,
+            final Set<String> availableBrokers, final 
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+        NamespaceName namespace = serviceUnit.getNamespaceObject();
+        return 
policies.areIsolationPoliciesPresentAsync(namespace).thenApply(isIsolationPoliciesPresent
 -> {
+            final Set<String> brokerCandidateCache = new HashSet<>();
+            Set<String> primariesCache = localPrimariesCache.get();
+            primariesCache.clear();
+
+            Set<String> secondaryCache = localSecondaryCache.get();
+            secondaryCache.clear();
+            boolean isNonPersistentTopic = (serviceUnit instanceof 
NamespaceBundle)
+                    ? ((NamespaceBundle) serviceUnit).hasNonPersistentTopic() 
: false;
+            if (isIsolationPoliciesPresent) {
+                LOG.debug("Isolation Policies Present for namespace - [{}]", 
namespace.toString());
+            }
+            for (final String broker : availableBrokers) {
+                final String brokerUrlString = String.format("http://%s";, 
broker);
+                URL brokerUrl;
+                try {
+                    brokerUrl = new URL(brokerUrlString);
+                } catch (MalformedURLException e) {
+                    LOG.error("Unable to parse brokerUrl from ResourceUnitId", 
e);
+                    continue;
+                }
+                // todo: in future check if the resource unit has resources to 
take the namespace
+                if (isIsolationPoliciesPresent) {
+                    // note: serviceUnitID is namespace name and ResourceID is 
brokerName
+                    if (policies.isPrimaryBroker(namespace, 
brokerUrl.getHost())) {
+                        primariesCache.add(broker);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Added Primary Broker - [{}] as possible 
Candidates for"
+                                    + " namespace - [{}] with policies", 
brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else if (policies.isSecondaryBroker(namespace, 
brokerUrl.getHost())) {
+                        secondaryCache.add(broker);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "Added Shared Broker - [{}] as possible "
+                                            + "Candidates for namespace - [{}] 
with policies",
+                                    brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Skipping Broker - [{}] not primary 
broker and not shared"
+                                            + " for namespace - [{}] ", 
brokerUrl.getHost(), namespace.toString());
+                        }
+
+                    }
+                } else {
+                    // non-persistent topic can be assigned to only those 
brokers that enabled for non-persistent topic
+                    if (isNonPersistentTopic
+                            && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Filter broker- [{}] because it doesn't 
support non-persistent namespace - [{}]",
+                                    brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else if (!isNonPersistentTopic
+                            && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+                        // persistent topic can be assigned to only brokers 
that enabled for persistent-topic
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Filter broker- [{}] because broker only 
supports non-persistent "
+                                            + "namespace - [{}]", 
brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else if (policies.isSharedBroker(brokerUrl.getHost())) {
+                        secondaryCache.add(broker);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Added Shared Broker - [{}] as possible 
Candidates for namespace - [{}]",
+                                    brokerUrl.getHost(), namespace.toString());
+                        }
+                    }
+                }
+            }
+            if (isIsolationPoliciesPresent) {
+                brokerCandidateCache.addAll(primariesCache);
+                if (policies.shouldFailoverToSecondaries(namespace, 
primariesCache.size())) {
+                    LOG.debug(

Review Comment:
   LOG.isDebugEnabled



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -181,6 +181,98 @@ public static void applyNamespacePolicies(final 
ServiceUnitId serviceUnit,
         }
     }
 
+    public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
+            final ServiceUnitId serviceUnit, final 
SimpleResourceAllocationPolicies policies,
+            final Set<String> availableBrokers, final 
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+        NamespaceName namespace = serviceUnit.getNamespaceObject();
+        return 
policies.areIsolationPoliciesPresentAsync(namespace).thenApply(isIsolationPoliciesPresent
 -> {
+            final Set<String> brokerCandidateCache = new HashSet<>();
+            Set<String> primariesCache = localPrimariesCache.get();
+            primariesCache.clear();
+
+            Set<String> secondaryCache = localSecondaryCache.get();
+            secondaryCache.clear();
+            boolean isNonPersistentTopic = (serviceUnit instanceof 
NamespaceBundle)
+                    ? ((NamespaceBundle) serviceUnit).hasNonPersistentTopic() 
: false;
+            if (isIsolationPoliciesPresent) {
+                LOG.debug("Isolation Policies Present for namespace - [{}]", 
namespace.toString());

Review Comment:
   LOG.isDebugEnabled



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -181,6 +181,98 @@ public static void applyNamespacePolicies(final 
ServiceUnitId serviceUnit,
         }
     }
 
+    public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
+            final ServiceUnitId serviceUnit, final 
SimpleResourceAllocationPolicies policies,
+            final Set<String> availableBrokers, final 
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+        NamespaceName namespace = serviceUnit.getNamespaceObject();
+        return 
policies.areIsolationPoliciesPresentAsync(namespace).thenApply(isIsolationPoliciesPresent
 -> {
+            final Set<String> brokerCandidateCache = new HashSet<>();
+            Set<String> primariesCache = localPrimariesCache.get();
+            primariesCache.clear();
+
+            Set<String> secondaryCache = localSecondaryCache.get();
+            secondaryCache.clear();
+            boolean isNonPersistentTopic = (serviceUnit instanceof 
NamespaceBundle)
+                    ? ((NamespaceBundle) serviceUnit).hasNonPersistentTopic() 
: false;
+            if (isIsolationPoliciesPresent) {
+                LOG.debug("Isolation Policies Present for namespace - [{}]", 
namespace.toString());
+            }
+            for (final String broker : availableBrokers) {
+                final String brokerUrlString = String.format("http://%s";, 
broker);
+                URL brokerUrl;
+                try {
+                    brokerUrl = new URL(brokerUrlString);
+                } catch (MalformedURLException e) {
+                    LOG.error("Unable to parse brokerUrl from ResourceUnitId", 
e);
+                    continue;
+                }
+                // todo: in future check if the resource unit has resources to 
take the namespace
+                if (isIsolationPoliciesPresent) {
+                    // note: serviceUnitID is namespace name and ResourceID is 
brokerName
+                    if (policies.isPrimaryBroker(namespace, 
brokerUrl.getHost())) {
+                        primariesCache.add(broker);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Added Primary Broker - [{}] as possible 
Candidates for"
+                                    + " namespace - [{}] with policies", 
brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else if (policies.isSecondaryBroker(namespace, 
brokerUrl.getHost())) {
+                        secondaryCache.add(broker);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "Added Shared Broker - [{}] as possible "
+                                            + "Candidates for namespace - [{}] 
with policies",
+                                    brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Skipping Broker - [{}] not primary 
broker and not shared"
+                                            + " for namespace - [{}] ", 
brokerUrl.getHost(), namespace.toString());
+                        }
+
+                    }
+                } else {
+                    // non-persistent topic can be assigned to only those 
brokers that enabled for non-persistent topic
+                    if (isNonPersistentTopic
+                            && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Filter broker- [{}] because it doesn't 
support non-persistent namespace - [{}]",
+                                    brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else if (!isNonPersistentTopic
+                            && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+                        // persistent topic can be assigned to only brokers 
that enabled for persistent-topic
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Filter broker- [{}] because broker only 
supports non-persistent "
+                                            + "namespace - [{}]", 
brokerUrl.getHost(), namespace.toString());
+                        }
+                    } else if (policies.isSharedBroker(brokerUrl.getHost())) {
+                        secondaryCache.add(broker);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Added Shared Broker - [{}] as possible 
Candidates for namespace - [{}]",
+                                    brokerUrl.getHost(), namespace.toString());
+                        }
+                    }
+                }
+            }
+            if (isIsolationPoliciesPresent) {
+                brokerCandidateCache.addAll(primariesCache);
+                if (policies.shouldFailoverToSecondaries(namespace, 
primariesCache.size())) {
+                    LOG.debug(
+                            "Not enough of primaries [{}] available for 
namespace - [{}], "
+                                    + "adding shared [{}] as possible 
candidate owners",
+                            primariesCache.size(), namespace.toString(), 
secondaryCache.size());
+                    brokerCandidateCache.addAll(secondaryCache);
+                }
+            } else {
+                LOG.debug(

Review Comment:
   LOG.isDebugEnabled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to