BewareMyPower commented on code in PR #20822:
URL: https://github.com/apache/pulsar/pull/20822#discussion_r1268071510
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -837,99 +851,119 @@ public Optional<String> selectBrokerForAssignment(final
ServiceUnitId serviceUni
// If the given bundle is already in preallocated, return
the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
- final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
- key -> getBundleDataOrDefault(bundle));
- brokerCandidateCache.clear();
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- // filter brokers which owns topic higher than threshold
-
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache,
loadData,
- conf.getLoadBalancerBrokerMaxTopics());
-
- // distribute namespaces to domain and brokers according to
anti-affinity-group
- LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar,
serviceUnit.toString(),
- brokerCandidateCache,
- brokerToNamespaceToBundleRange,
brokerToFailureDomainMap);
-
- // distribute bundles evenly to candidate-brokers if enable
- if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
-
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(),
- brokerCandidateCache,
- brokerToNamespaceToBundleRange);
- if (log.isDebugEnabled()) {
- log.debug("enable distribute bundles evenly to
candidate-brokers, broker candidate count={}",
- brokerCandidateCache.size());
- }
+ Optional<String> broker = selectBroker(serviceUnit);
+ if (!broker.isPresent()) {
+ // If no broker is selected, return empty.
+ return broker;
}
- log.info("{} brokers being considered for assignment of {}",
brokerCandidateCache.size(), bundle);
+ // Add new bundle to preallocated.
+ preallocateBundle(bundle, broker.get());
+ return broker;
+ }
+ } finally {
+ selectBrokerForAssignment.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ }
+ }
- // Use the filter pipeline to finalize broker candidates.
- try {
- for (BrokerFilter filter : filterPipeline) {
- filter.filter(brokerCandidateCache, data, loadData,
conf);
- }
- } catch (BrokerFilterException x) {
- // restore the list of brokers to the full set
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- }
+ private void preallocateBundle(String bundle, String broker) {
+ final BundleData data =
loadData.getBundleData().computeIfAbsent(bundle,
+ key -> getBundleDataOrDefault(bundle));
+
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle,
data);
+ preallocatedBundleToBroker.put(bundle, broker);
+
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>
namespaceToBundleRange =
+ brokerToNamespaceToBundleRange
+ .computeIfAbsent(broker,
+ k -> ConcurrentOpenHashMap.<String,
+
ConcurrentOpenHashSet<String>>newBuilder()
+ .build());
+ synchronized (namespaceToBundleRange) {
+ namespaceToBundleRange.computeIfAbsent(namespaceName,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+ .add(bundleRange);
+ }
+ }
- if (brokerCandidateCache.isEmpty()) {
- // restore the list of brokers to the full set
- LoadManagerShared.applyNamespacePolicies(serviceUnit,
policies, brokerCandidateCache,
- getAvailableBrokers(),
- brokerTopicLoadingPredicate);
- }
+ @VisibleForTesting
+ protected Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
Review Comment:
```suggestion
Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
```
`protected` is not needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]