gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1114234240


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -851,28 +985,51 @@ private void doCleanup(String broker) {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
-        if (serviceUnitTombstoneCnt > 0) {
-            this.totalCleanupCnt++;
-            this.totalServiceUnitCleanupTombstoneCnt += 
serviceUnitTombstoneCnt;
-            this.totalBrokerCleanupTombstoneCnt++;
+        if (orphanServiceUnitCleanupCnt > 0) {
+            this.totalOrphanServiceUnitCleanupCnt += 
orphanServiceUnitCleanupCnt;
+            this.totalInactiveBrokerCleanupCnt++;
         }
 
         double cleanupTime = TimeUnit.NANOSECONDS
                 .toMillis((System.nanoTime() - startTime));
         // TODO: clean load data stores
         log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
-                        + "Published tombstone for orphan service units: 
serviceUnitTombstoneCnt:{}, "
+                        + "Cleaned up orphan service units: 
orphanServiceUnitCleanupCnt:{}, "
                         + "approximate cleanupErrorCnt:{}, metrics:{} ",
                 broker,
                 cleanupTime,
-                serviceUnitTombstoneCnt,
+                orphanServiceUnitCleanupCnt,
                 totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
                 printCleanupMetrics());
         cleanupJobs.remove(broker);
     }
 
-    // TODO: integrate this monitor logic when broker registry is added
-    private void monitorOwnerships(List<String> brokers) {
+    private Optional<ServiceUnitStateData> getOverrideStateData(String 
serviceUnit, ServiceUnitStateData orphanData,
+                                                                Set<String> 
availableBrokers,
+                                                                
LoadManagerContext context) {
+        if (isTransferCommand(orphanData)) {
+            // rollback to the src
+            return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.sourceBroker(), true));
+        } else if (orphanData.state() == Assigning) { // assign
+            // roll-forward to another broker

Review Comment:
   Do we need to check this is a transfer assign or not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to