heesung-sn commented on code in PR #21854:
URL: https://github.com/apache/pulsar/pull/21854#discussion_r1442252356


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -92,17 +153,35 @@ public void handleEvent(String serviceUnit, 
ServiceUnitStateData data, Throwable
             if (log.isDebugEnabled()) {
                 log.debug("Handling {} for service unit {} with exception.", 
data, serviceUnit, t);
             }
-            this.complete(serviceUnit, t);
+            complete(serviceUnit, t);
             return;
         }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Handling {} for service unit {}", data, serviceUnit);
+        }
         ServiceUnitState state = ServiceUnitStateData.state(data);

Review Comment:
   Actually, we call this `handleEvent` after completing the future of 
`closeServiceUnit` now.  However, we want to call `recordReleaseLatency` before 
`closeServiceUnit` to include `closeServiceUnit` time in the latency.
   
   This means we probably need to add `StateChangeListeners::notifyOnArrival` 
and call that at the beginning like the one below
   ``` java
   private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData 
data) {
   
           stateChangeListeners.notifyOnArrival(..) //notify first to start 
measure latency
           if (isTargetBroker(data.sourceBroker())) {
               ServiceUnitStateData next;
               if (isTransferCommand(data)) {
                   next = new ServiceUnitStateData(
                           Assigning, data.dstBroker(), data.sourceBroker(), 
getNextVersionId(data));
                   // TODO: when close, pass message to clients to connect to 
the new broker
               } else {
                   next = new ServiceUnitStateData(
                           Free, null, data.sourceBroker(), 
getNextVersionId(data));
               }
               
stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit)
                               .thenCompose(__ -> pubAsync(serviceUnit, next)), 
serviceUnit, data)
                       .whenComplete((__, e) -> log(e, serviceUnit, data, 
next));
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to