heesung-sn commented on code in PR #23349:
URL: https://github.com/apache/pulsar/pull/23349#discussion_r1776857280


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -791,14 +782,36 @@ public void close() throws PulsarServerException {
                     } catch (Exception e) {
                         throw new PulsarServerException(e);
                     } finally {
-                        this.started = false;
+                        state.set(State.INIT);
                     }
                 }
 
             }
         }
     }
 
+    private void stopLoadDataReportTasks() {
+        if (brokerLoadDataReportTask != null) {
+            brokerLoadDataReportTask.cancel(true);
+        }
+        if (topBundlesLoadDataReportTask != null) {
+            topBundlesLoadDataReportTask.cancel(true);
+        }
+        if (monitorTask != null) {
+            monitorTask.cancel(true);
+        }
+        try {
+            brokerLoadDataStore.shutdown();
+        } catch (IOException e) {
+            log.warn("Failed to shutdown brokerLoadDataStore", e);
+        }
+        try {
+            topBundlesLoadDataStore.shutdown();
+        } catch (IOException e) {
+            log.warn("Failed to shutdown brokerLoadDataStore", e);

Review Comment:
   topBundlesLoadDataStore



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -1391,7 +1402,9 @@ private void waitForCleanups(String broker, boolean 
excludeSystemTopics, int max
                     continue;
                 }
 
-                if (data.state() == Owned && broker.equals(data.dstBroker())) {
+                if (data.state() == Free) {
+                    futures.put(serviceUnit, handleFreeEvent(serviceUnit, 
data));

Review Comment:
   why do we need to additionally handleFreeEvent here? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -679,11 +678,15 @@ private void handleEvent(String serviceUnit, 
ServiceUnitStateData data) {
                     brokerId, serviceUnit, data, totalHandledRequests);
         }
 
-        if (channelState == Disabled) {
+        ServiceUnitState state = state(data);
+        if (channelState == Disabled && (data == null || !data.force())) {

Review Comment:
   why do we need `(data == null || !data.force()` logic here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -982,17 +1017,28 @@ protected void monitor() {
     }
 
     public void disableBroker() throws Exception {
+        // TopicDoesNotExistException might be thrown and it's not 
recoverable. Enable this flag to exit playFollower()
+        // or playLeader() quickly.
+        if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
+            failForUnexpectedState("disableBroker");
+        }
+        stopLoadDataReportTasks();
         serviceUnitStateChannel.cleanOwnerships();
-        leaderElectionService.close();
         brokerRegistry.unregister();
+        leaderElectionService.close();
+        final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
+                .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        if (availableBrokers.isEmpty()) {
+            close();
+        }
         // Close the internal topics (if owned any) after giving up the 
possible leader role,
         // so that the subsequent lookups could hit the next leader.
         closeInternalTopics();
     }
 
     private void closeInternalTopics() {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (String name : INTERNAL_TOPICS) {
+        for (String name : Set.of(BROKER_LOAD_DATA_STORE_TOPIC, 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC)) {

Review Comment:
   Could you explain why this change is required?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java:
##########
@@ -92,7 +92,11 @@ public synchronized CompletableFuture<Void> 
removeAsync(String key) {
     public synchronized Optional<T> get(String key) {
         String msg = validateTableView();
         if (StringUtils.isNotBlank(msg)) {
-            throw new IllegalStateException(msg);
+            if (msg.equals(SHUTDOWN_ERR_MSG)) {
+                return Optional.empty();

Review Comment:
   can you explain why we need to return empty here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to