lifepuzzlefun commented on code in PR #20666:
URL: https://github.com/apache/pulsar/pull/20666#discussion_r1245295502
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -489,24 +489,27 @@ public CompletableFuture<Optional<String>>
selectAsync(ServiceUnitId bundle,
// Filter out brokers that do not meet the rules.
List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
+ ArrayList<CompletableFuture<Map<String,
BrokerLookupData>>> futures =
+ new ArrayList<>(filterPipeline.size());
for (final BrokerFilter filter : filterPipeline) {
- try {
- filter.filter(availableBrokerCandidates, bundle,
context);
- // Preserve the filter successes result.
-
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
- } catch (BrokerFilterException e) {
- // TODO: We may need to revisit this error case.
- log.error("Failed to filter out brokers.", e);
- availableBrokerCandidates = new
HashMap<>(availableBrokers);
- }
- }
- if (availableBrokerCandidates.isEmpty()) {
- return
CompletableFuture.completedFuture(Optional.empty());
+ CompletableFuture<Map<String, BrokerLookupData>>
future =
+ filter.filter(availableBrokerCandidates,
bundle, context);
+ futures.add(future);
Review Comment:
do we need process order for the filter here. the k8s scheduler will apply
filter orderly. I wonder if we need it.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -489,24 +489,27 @@ public CompletableFuture<Optional<String>>
selectAsync(ServiceUnitId bundle,
// Filter out brokers that do not meet the rules.
List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
+ ArrayList<CompletableFuture<Map<String,
BrokerLookupData>>> futures =
+ new ArrayList<>(filterPipeline.size());
for (final BrokerFilter filter : filterPipeline) {
- try {
- filter.filter(availableBrokerCandidates, bundle,
context);
- // Preserve the filter successes result.
-
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
- } catch (BrokerFilterException e) {
- // TODO: We may need to revisit this error case.
- log.error("Failed to filter out brokers.", e);
- availableBrokerCandidates = new
HashMap<>(availableBrokers);
- }
- }
- if (availableBrokerCandidates.isEmpty()) {
- return
CompletableFuture.completedFuture(Optional.empty());
+ CompletableFuture<Map<String, BrokerLookupData>>
future =
+ filter.filter(availableBrokerCandidates,
bundle, context);
+ futures.add(future);
}
- Set<String> candidateBrokers =
availableBrokerCandidates.keySet();
+ CompletableFuture<Optional<String>> result = new
CompletableFuture<>();
+ FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to filter out brokers when
select bundle: {}", bundle, ex);
Review Comment:
It seems we ignore all the filter result exception here. does this will
inference the filter logic ? or we can choose some soft limit filter exception
can be ignored and some filter are not.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java:
##########
@@ -53,6 +54,20 @@ private Optional<NamespaceIsolationPolicies>
getIsolationPolicies(String cluster
}
}
+ private CompletableFuture<Optional<NamespaceIsolationPolicies>>
getIsolationPoliciesAsync(String clusterName) {
+ return this.pulsar.getPulsarResources().getNamespaceResources()
+
.getIsolationPolicies().getIsolationDataPoliciesAsync(clusterName);
Review Comment:
I'm not sure the callback of this CompletableFuture is still execute on the
metadataStore thread. if yes, if switch callback execute thread to other thread
is needed ? WDYT?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -181,6 +181,98 @@ public static void applyNamespacePolicies(final
ServiceUnitId serviceUnit,
}
}
+ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
+ final ServiceUnitId serviceUnit, final
SimpleResourceAllocationPolicies policies,
+ final Set<String> availableBrokers, final
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+ NamespaceName namespace = serviceUnit.getNamespaceObject();
+ return
policies.areIsolationPoliciesPresentAsync(namespace).thenApply(isIsolationPoliciesPresent
-> {
+ final Set<String> brokerCandidateCache = new HashSet<>();
+ Set<String> primariesCache = localPrimariesCache.get();
+ primariesCache.clear();
+
+ Set<String> secondaryCache = localSecondaryCache.get();
+ secondaryCache.clear();
+ boolean isNonPersistentTopic = (serviceUnit instanceof
NamespaceBundle)
+ ? ((NamespaceBundle) serviceUnit).hasNonPersistentTopic()
: false;
+ if (isIsolationPoliciesPresent) {
+ LOG.debug("Isolation Policies Present for namespace - [{}]",
namespace.toString());
+ }
+ for (final String broker : availableBrokers) {
+ final String brokerUrlString = String.format("http://%s",
broker);
+ URL brokerUrl;
+ try {
+ brokerUrl = new URL(brokerUrlString);
+ } catch (MalformedURLException e) {
+ LOG.error("Unable to parse brokerUrl from ResourceUnitId",
e);
+ continue;
+ }
+ // todo: in future check if the resource unit has resources to
take the namespace
+ if (isIsolationPoliciesPresent) {
+ // note: serviceUnitID is namespace name and ResourceID is
brokerName
+ if (policies.isPrimaryBroker(namespace,
brokerUrl.getHost())) {
+ primariesCache.add(broker);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added Primary Broker - [{}] as possible
Candidates for"
+ + " namespace - [{}] with policies",
brokerUrl.getHost(), namespace.toString());
+ }
+ } else if (policies.isSecondaryBroker(namespace,
brokerUrl.getHost())) {
+ secondaryCache.add(broker);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Added Shared Broker - [{}] as possible "
+ + "Candidates for namespace - [{}]
with policies",
+ brokerUrl.getHost(), namespace.toString());
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping Broker - [{}] not primary
broker and not shared"
+ + " for namespace - [{}] ",
brokerUrl.getHost(), namespace.toString());
+ }
+
+ }
+ } else {
+ // non-persistent topic can be assigned to only those
brokers that enabled for non-persistent topic
+ if (isNonPersistentTopic
+ &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Filter broker- [{}] because it doesn't
support non-persistent namespace - [{}]",
+ brokerUrl.getHost(), namespace.toString());
+ }
+ } else if (!isNonPersistentTopic
+ &&
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+ // persistent topic can be assigned to only brokers
that enabled for persistent-topic
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Filter broker- [{}] because broker only
supports non-persistent "
+ + "namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
+ }
+ } else if (policies.isSharedBroker(brokerUrl.getHost())) {
+ secondaryCache.add(broker);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added Shared Broker - [{}] as possible
Candidates for namespace - [{}]",
+ brokerUrl.getHost(), namespace.toString());
+ }
+ }
+ }
+ }
+ if (isIsolationPoliciesPresent) {
+ brokerCandidateCache.addAll(primariesCache);
+ if (policies.shouldFailoverToSecondaries(namespace,
primariesCache.size())) {
+ LOG.debug(
Review Comment:
LOG.isDebugEnabled
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -181,6 +181,98 @@ public static void applyNamespacePolicies(final
ServiceUnitId serviceUnit,
}
}
+ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
+ final ServiceUnitId serviceUnit, final
SimpleResourceAllocationPolicies policies,
+ final Set<String> availableBrokers, final
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+ NamespaceName namespace = serviceUnit.getNamespaceObject();
+ return
policies.areIsolationPoliciesPresentAsync(namespace).thenApply(isIsolationPoliciesPresent
-> {
+ final Set<String> brokerCandidateCache = new HashSet<>();
+ Set<String> primariesCache = localPrimariesCache.get();
+ primariesCache.clear();
+
+ Set<String> secondaryCache = localSecondaryCache.get();
+ secondaryCache.clear();
+ boolean isNonPersistentTopic = (serviceUnit instanceof
NamespaceBundle)
+ ? ((NamespaceBundle) serviceUnit).hasNonPersistentTopic()
: false;
+ if (isIsolationPoliciesPresent) {
+ LOG.debug("Isolation Policies Present for namespace - [{}]",
namespace.toString());
Review Comment:
LOG.isDebugEnabled
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java:
##########
@@ -181,6 +181,98 @@ public static void applyNamespacePolicies(final
ServiceUnitId serviceUnit,
}
}
+ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
+ final ServiceUnitId serviceUnit, final
SimpleResourceAllocationPolicies policies,
+ final Set<String> availableBrokers, final
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+ NamespaceName namespace = serviceUnit.getNamespaceObject();
+ return
policies.areIsolationPoliciesPresentAsync(namespace).thenApply(isIsolationPoliciesPresent
-> {
+ final Set<String> brokerCandidateCache = new HashSet<>();
+ Set<String> primariesCache = localPrimariesCache.get();
+ primariesCache.clear();
+
+ Set<String> secondaryCache = localSecondaryCache.get();
+ secondaryCache.clear();
+ boolean isNonPersistentTopic = (serviceUnit instanceof
NamespaceBundle)
+ ? ((NamespaceBundle) serviceUnit).hasNonPersistentTopic()
: false;
+ if (isIsolationPoliciesPresent) {
+ LOG.debug("Isolation Policies Present for namespace - [{}]",
namespace.toString());
+ }
+ for (final String broker : availableBrokers) {
+ final String brokerUrlString = String.format("http://%s",
broker);
+ URL brokerUrl;
+ try {
+ brokerUrl = new URL(brokerUrlString);
+ } catch (MalformedURLException e) {
+ LOG.error("Unable to parse brokerUrl from ResourceUnitId",
e);
+ continue;
+ }
+ // todo: in future check if the resource unit has resources to
take the namespace
+ if (isIsolationPoliciesPresent) {
+ // note: serviceUnitID is namespace name and ResourceID is
brokerName
+ if (policies.isPrimaryBroker(namespace,
brokerUrl.getHost())) {
+ primariesCache.add(broker);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added Primary Broker - [{}] as possible
Candidates for"
+ + " namespace - [{}] with policies",
brokerUrl.getHost(), namespace.toString());
+ }
+ } else if (policies.isSecondaryBroker(namespace,
brokerUrl.getHost())) {
+ secondaryCache.add(broker);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Added Shared Broker - [{}] as possible "
+ + "Candidates for namespace - [{}]
with policies",
+ brokerUrl.getHost(), namespace.toString());
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping Broker - [{}] not primary
broker and not shared"
+ + " for namespace - [{}] ",
brokerUrl.getHost(), namespace.toString());
+ }
+
+ }
+ } else {
+ // non-persistent topic can be assigned to only those
brokers that enabled for non-persistent topic
+ if (isNonPersistentTopic
+ &&
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Filter broker- [{}] because it doesn't
support non-persistent namespace - [{}]",
+ brokerUrl.getHost(), namespace.toString());
+ }
+ } else if (!isNonPersistentTopic
+ &&
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
+ // persistent topic can be assigned to only brokers
that enabled for persistent-topic
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Filter broker- [{}] because broker only
supports non-persistent "
+ + "namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
+ }
+ } else if (policies.isSharedBroker(brokerUrl.getHost())) {
+ secondaryCache.add(broker);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added Shared Broker - [{}] as possible
Candidates for namespace - [{}]",
+ brokerUrl.getHost(), namespace.toString());
+ }
+ }
+ }
+ }
+ if (isIsolationPoliciesPresent) {
+ brokerCandidateCache.addAll(primariesCache);
+ if (policies.shouldFailoverToSecondaries(namespace,
primariesCache.size())) {
+ LOG.debug(
+ "Not enough of primaries [{}] available for
namespace - [{}], "
+ + "adding shared [{}] as possible
candidate owners",
+ primariesCache.size(), namespace.toString(),
secondaryCache.size());
+ brokerCandidateCache.addAll(secondaryCache);
+ }
+ } else {
+ LOG.debug(
Review Comment:
LOG.isDebugEnabled
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]