heesung-sn commented on code in PR #19813:
URL: https://github.com/apache/pulsar/pull/19813#discussion_r1144124188


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java:
##########
@@ -134,25 +143,29 @@ public void init() throws MetadataStoreException {
                     (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) 
? BoundType.CLOSED : BoundType.OPEN);
             return new NamespaceBundle(NamespaceName.get(namespace), 
hashRange, factory);
         }).when(factory).getBundle(anyString(), anyString());
-        doReturn(true).when(antiAffinityGroupPolicyHelper).canUnload(any(), 
any(), any(), any());
+        doReturn(new 
AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager();

Review Comment:
   updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -195,47 +209,65 @@ Optional<UnloadDecision.Reason> update(final 
LoadDataStore<BrokerLoadData> loadS
 
                 double load = localBrokerData.getWeightedMaxEMA();
 
-                minBrokers.offer(broker);
-                if (minBrokers.size() > maxTransfers) {
-                    minBrokers.poll();
-                }
-                maxBrokers.offer(broker);
-                if (maxBrokers.size() > maxTransfers) {
-                    maxBrokers.poll();
-                }
                 sum += load;
                 sqSum += load * load;
                 totalBrokers++;
             }
 
-
             if (totalBrokers == 0) {
                 if (decisionReason == null) {
                     decisionReason = NoBrokers;
                 }
                 update(0.0, 0.0, 0);
+                if (debug) {
+                    log.info("There is no broker load data.");
+                }
+                return Optional.of(decisionReason);
+            }
+
+            if (!missingLoadDataBrokers.isEmpty()) {
+                decisionReason = NoLoadData;
+                update(0.0, 0.0, 0);
+                if (debug) {
+                    log.info("There is missing load data from brokers:{}", 
missingLoadDataBrokers);
+                }
                 return Optional.of(decisionReason);
             }
 
             update(sum, sqSum, totalBrokers);
             return Optional.empty();
         }
 
-        boolean hasTransferableBrokers() {
-            return !(maxBrokers.isEmpty() || minBrokers.isEmpty()
-                    || maxBrokers.peekLast().equals(minBrokers().peekLast()));
-        }
-
         void setLoadDataStore(LoadDataStore<BrokerLoadData> loadDataStore) {
             this.loadDataStore = loadDataStore;
+            brokersSortedByLoad.addAll(loadDataStore.entrySet());
+            Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare(
+                    a.getValue().getWeightedMaxEMA(),
+                    b.getValue().getWeightedMaxEMA()));

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to