heesung-sn commented on code in PR #19813: URL: https://github.com/apache/pulsar/pull/19813#discussion_r1144124188
########## pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java: ########## @@ -134,25 +143,29 @@ public void init() throws MetadataStoreException { (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); return new NamespaceBundle(NamespaceName.get(namespace), hashRange, factory); }).when(factory).getBundle(anyString(), anyString()); - doReturn(true).when(antiAffinityGroupPolicyHelper).canUnload(any(), any(), any(), any()); + doReturn(new AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager(); Review Comment: updated. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java: ########## @@ -195,47 +209,65 @@ Optional<UnloadDecision.Reason> update(final LoadDataStore<BrokerLoadData> loadS double load = localBrokerData.getWeightedMaxEMA(); - minBrokers.offer(broker); - if (minBrokers.size() > maxTransfers) { - minBrokers.poll(); - } - maxBrokers.offer(broker); - if (maxBrokers.size() > maxTransfers) { - maxBrokers.poll(); - } sum += load; sqSum += load * load; totalBrokers++; } - if (totalBrokers == 0) { if (decisionReason == null) { decisionReason = NoBrokers; } update(0.0, 0.0, 0); + if (debug) { + log.info("There is no broker load data."); + } + return Optional.of(decisionReason); + } + + if (!missingLoadDataBrokers.isEmpty()) { + decisionReason = NoLoadData; + update(0.0, 0.0, 0); + if (debug) { + log.info("There is missing load data from brokers:{}", missingLoadDataBrokers); + } return Optional.of(decisionReason); } update(sum, sqSum, totalBrokers); return Optional.empty(); } - boolean hasTransferableBrokers() { - return !(maxBrokers.isEmpty() || minBrokers.isEmpty() - || maxBrokers.peekLast().equals(minBrokers().peekLast())); - } - void setLoadDataStore(LoadDataStore<BrokerLoadData> loadDataStore) { this.loadDataStore = loadDataStore; + brokersSortedByLoad.addAll(loadDataStore.entrySet()); + Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare( + a.getValue().getWeightedMaxEMA(), + b.getValue().getWeightedMaxEMA())); Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org