This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e91574ac7b4 [fix][broker] Fix unloadNamespaceBundlesGracefully can be 
stuck with extensible load manager  (#23349)
e91574ac7b4 is described below

commit e91574ac7b44348a05f1ae812c5aae3abb26fe64
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 27 17:33:28 2024 +0800

    [fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with 
extensible load manager  (#23349)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   7 +-
 .../extensions/ExtensibleLoadManagerImpl.java      | 130 +++++++++++++++------
 .../extensions/ExtensibleLoadManagerWrapper.java   |   2 +-
 .../channel/ServiceUnitStateChannelImpl.java       |  43 +++++--
 .../extensions/channel/ServiceUnitStateData.java   |   2 +-
 .../ServiceUnitStateDataConflictResolver.java      |   2 +-
 .../channel/ServiceUnitStateTableViewImpl.java     |  19 ++-
 .../filter/BrokerMaxTopicCountFilter.java          |   7 +-
 .../store/TableViewLoadDataStoreImpl.java          |  10 +-
 .../pulsar/broker/service/BrokerService.java       |   7 +-
 .../SystemTopicBasedTopicPoliciesService.java      |  32 +++--
 .../extensions/ExtensibleLoadManagerCloseTest.java |  50 ++++++--
 .../extensions/store/LoadDataStoreTest.java        |   3 +-
 .../apache/pulsar/client/impl/TableViewImpl.java   |   7 +-
 14 files changed, 240 insertions(+), 81 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a2f6fb9e977..6c768a07897 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -513,6 +513,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 return closeFuture;
             }
             LOG.info("Closing PulsarService");
+            if (topicPoliciesService != null) {
+                topicPoliciesService.close();
+            }
             if (brokerService != null) {
                 brokerService.unloadNamespaceBundlesGracefully();
             }
@@ -633,10 +636,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 transactionBufferClient.close();
             }
 
-            if (topicPoliciesService != null) {
-                topicPoliciesService.close();
-                topicPoliciesService = null;
-            }
 
             if (client != null) {
                 client.close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 98ef6bf36ed..841f9bfb669 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -181,7 +181,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     private SplitManager splitManager;
 
-    volatile boolean started = false;
+    enum State {
+        INIT,
+        RUNNING,
+        // It's removing visibility of the current broker from other brokers. 
In this state, it cannot play as a leader
+        // or follower.
+        DISABLED,
+    }
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.INIT);
 
     private boolean configuredSystemTopics = false;
 
@@ -214,7 +221,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     }
 
     public Set<NamespaceBundle> getOwnedServiceUnits() {
-        if (!started) {
+        if (state.get() == State.INIT) {
             log.warn("Failed to get owned service units, load manager is not 
started.");
             return Collections.emptySet();
         }
@@ -344,7 +351,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     @Override
     public void start() throws PulsarServerException {
-        if (this.started) {
+        if (state.get() != State.INIT) {
             return;
         }
         try {
@@ -443,7 +450,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
                     this.splitScheduler.start();
                     this.initWaiter.complete(true);
-                    this.started = true;
+                    if (!state.compareAndSet(State.INIT, State.RUNNING)) {
+                        failForUnexpectedState("start");
+                    }
                     log.info("Started load manager.");
                 } catch (Throwable e) {
                     failStarting(e);
@@ -615,21 +624,17 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                                 filter.filterAsync(availableBrokerCandidates, 
bundle, context);
                         futures.add(future);
                     }
-                    CompletableFuture<Optional<String>> result = new 
CompletableFuture<>();
-                    FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
-                        if (ex != null) {
-                            // TODO: We may need to revisit this error case.
-                            log.error("Failed to filter out brokers when 
select bundle: {}", bundle, ex);
-                        }
+                    return FutureUtil.waitForAll(futures).exceptionally(e -> {
+                        // TODO: We may need to revisit this error case.
+                        log.error("Failed to filter out brokers when select 
bundle: {}", bundle, e);
+                        return null;
+                    }).thenApply(__ -> {
                         if (availableBrokerCandidates.isEmpty()) {
-                            result.complete(Optional.empty());
-                            return;
+                            return Optional.empty();
                         }
                         Set<String> candidateBrokers = 
availableBrokerCandidates.keySet();
-
-                        
result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle, 
context));
+                        return 
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
                     });
-                    return result;
                 });
     }
 
@@ -667,6 +672,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                                                               boolean force,
                                                               long timeout,
                                                               TimeUnit 
timeoutUnit) {
+        if (state.get() == State.INIT) {
+            return CompletableFuture.completedFuture(null);
+        }
         if 
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
 {
             log.info("Skip unloading namespace bundle: {}.", bundle);
             return CompletableFuture.completedFuture(null);
@@ -755,24 +763,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     @Override
     public void close() throws PulsarServerException {
-        if (!this.started) {
+        if (state.get() == State.INIT) {
             return;
         }
         try {
-            if (brokerLoadDataReportTask != null) {
-                brokerLoadDataReportTask.cancel(true);
-            }
-
-            if (topBundlesLoadDataReportTask != null) {
-                topBundlesLoadDataReportTask.cancel(true);
-            }
-
-            if (monitorTask != null) {
-                monitorTask.cancel(true);
-            }
-
-            this.brokerLoadDataStore.shutdown();
-            this.topBundlesLoadDataStore.shutdown();
+            stopLoadDataReportTasks();
             this.unloadScheduler.close();
             this.splitScheduler.close();
             this.serviceUnitStateTableViewSyncer.close();
@@ -791,7 +786,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                     } catch (Exception e) {
                         throw new PulsarServerException(e);
                     } finally {
-                        this.started = false;
+                        state.set(State.INIT);
                     }
                 }
 
@@ -799,6 +794,28 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         }
     }
 
+    private void stopLoadDataReportTasks() {
+        if (brokerLoadDataReportTask != null) {
+            brokerLoadDataReportTask.cancel(true);
+        }
+        if (topBundlesLoadDataReportTask != null) {
+            topBundlesLoadDataReportTask.cancel(true);
+        }
+        if (monitorTask != null) {
+            monitorTask.cancel(true);
+        }
+        try {
+            brokerLoadDataStore.shutdown();
+        } catch (IOException e) {
+            log.warn("Failed to shutdown brokerLoadDataStore", e);
+        }
+        try {
+            topBundlesLoadDataStore.shutdown();
+        } catch (IOException e) {
+            log.warn("Failed to shutdown topBundlesLoadDataStore", e);
+        }
+    }
+
     public static boolean isInternalTopic(String topic) {
         return INTERNAL_TOPICS.contains(topic)
                 || topic.startsWith(TOPIC)
@@ -814,13 +831,16 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameFollower = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                if (!initWaiter.get()) {
+                if (!initWaiter.get() || disabled()) {
                     return;
                 }
                 if (!serviceUnitStateChannel.isChannelOwner()) {
                     becameFollower = true;
                     break;
                 }
+                if (disabled()) {
+                    return;
+                }
                 // Confirm the system topics have been created or create them 
if they do not exist.
                 // If the leader has changed, the new leader need to reset
                 // the local brokerService.topics (by this topic creations).
@@ -835,6 +855,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 }
                 break;
             } catch (Throwable e) {
+                if (disabled()) {
+                    log.warn("The broker:{} failed to set the role but exit 
because it's disabled",
+                            pulsar.getBrokerId(), e);
+                    return;
+                }
                 log.warn("The broker:{} failed to set the role. Retrying {} th 
...",
                         pulsar.getBrokerId(), ++retry, e);
                 try {
@@ -846,6 +871,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 }
             }
         }
+        if (disabled()) {
+            return;
+        }
 
         if (becameFollower) {
             log.warn("The broker:{} became follower while initializing leader 
role.", pulsar.getBrokerId());
@@ -869,13 +897,16 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameLeader = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                if (!initWaiter.get()) {
+                if (!initWaiter.get() || disabled()) {
                     return;
                 }
                 if (serviceUnitStateChannel.isChannelOwner()) {
                     becameLeader = true;
                     break;
                 }
+                if (disabled()) {
+                    return;
+                }
                 unloadScheduler.close();
                 serviceUnitStateChannel.cancelOwnershipMonitor();
                 closeInternalTopics();
@@ -885,6 +916,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 serviceUnitStateTableViewSyncer.close();
                 break;
             } catch (Throwable e) {
+                if (disabled()) {
+                    log.warn("The broker:{} failed to set the role but exit 
because it's disabled",
+                            pulsar.getBrokerId(), e);
+                    return;
+                }
                 log.warn("The broker:{} failed to set the role. Retrying {} th 
...",
                         pulsar.getBrokerId(), ++retry, e);
                 try {
@@ -896,6 +932,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 }
             }
         }
+        if (disabled()) {
+            return;
+        }
 
         if (becameLeader) {
             log.warn("This broker:{} became leader while initializing follower 
role.", pulsar.getBrokerId());
@@ -982,9 +1021,20 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     }
 
     public void disableBroker() throws Exception {
+        // TopicDoesNotExistException might be thrown and it's not 
recoverable. Enable this flag to exit playFollower()
+        // or playLeader() quickly.
+        if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
+            failForUnexpectedState("disableBroker");
+        }
+        stopLoadDataReportTasks();
         serviceUnitStateChannel.cleanOwnerships();
-        leaderElectionService.close();
         brokerRegistry.unregister();
+        leaderElectionService.close();
+        final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
+                .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        if (availableBrokers.isEmpty()) {
+            close();
+        }
         // Close the internal topics (if owned any) after giving up the 
possible leader role,
         // so that the subsequent lookups could hit the next leader.
         closeInternalTopics();
@@ -1018,4 +1068,16 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     protected ServiceUnitStateChannel 
createServiceUnitStateChannel(PulsarService pulsar) {
         return new ServiceUnitStateChannelImpl(pulsar);
     }
+
+    private void failForUnexpectedState(String msg) {
+        throw new IllegalStateException("Failed to " + msg + ", state: " + 
state.get());
+    }
+
+    boolean running() {
+        return state.get() == State.RUNNING;
+    }
+
+    private boolean disabled() {
+        return state.get() == State.DISABLED;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 6a48607977b..35f6cfcbcf5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -51,7 +51,7 @@ public class ExtensibleLoadManagerWrapper implements 
LoadManager {
     }
 
     public boolean started() {
-        return loadManager.started && 
loadManager.getServiceUnitStateChannel().started();
+        return loadManager.running() && 
loadManager.getServiceUnitStateChannel().started();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index ddbc9eacac9..ce975495feb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -255,6 +255,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     @Override
     public void cleanOwnerships() {
+        disable();
         doCleanup(brokerId, true);
     }
 
@@ -412,9 +413,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             if (owner.isPresent()) {
                 return isTargetBroker(owner.get());
             } else {
-                String msg = "There is no channel owner now.";
-                log.error(msg);
-                throw new IllegalStateException(msg);
+                throw new IllegalStateException("There is no channel owner 
now.");
             }
         });
     }
@@ -679,11 +678,15 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     brokerId, serviceUnit, data, totalHandledRequests);
         }
 
-        if (channelState == Disabled) {
+        ServiceUnitState state = state(data);
+        if (channelState == Disabled && (data == null || !data.force())) {
+            final var request = getOwnerRequests.remove(serviceUnit);
+            if (request != null) {
+                request.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(
+                        "cancel the lookup request for " + serviceUnit + " 
when receiving " + state));
+            }
             return;
         }
-
-        ServiceUnitState state = state(data);
         try {
             switch (state) {
                 case Owned -> handleOwnEvent(serviceUnit, data);
@@ -851,7 +854,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
-    private void handleFreeEvent(String serviceUnit, ServiceUnitStateData 
data) {
+    private CompletableFuture<Void> handleFreeEvent(String serviceUnit, 
ServiceUnitStateData data) {
         var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(null);
@@ -865,8 +868,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                             : 
CompletableFuture.completedFuture(0)).thenApply(__ -> null);
             stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+            return future;
         } else {
             stateChangeListeners.notify(serviceUnit, data, null);
+            return CompletableFuture.completedFuture(null);
         }
     }
 
@@ -1273,7 +1278,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 return;
             }
         } catch (Exception e) {
-            log.error("Failed to handle broker deletion event.", e);
+            if (e instanceof ExecutionException && e.getCause() instanceof 
IllegalStateException) {
+                log.warn("Failed to handle broker deletion event due to {}", 
e.getMessage());
+            } else {
+                log.error("Failed to handle broker deletion event.", e);
+            }
             return;
         }
         MetadataState state = getMetadataState();
@@ -1293,6 +1302,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private void scheduleCleanup(String broker, long delayInSecs) {
         var scheduled = new MutableObject<CompletableFuture<Void>>();
         try {
+            final var channelState = this.channelState;
+            if (channelState == Disabled || channelState == Closed) {
+                log.warn("[{}] Skip scheduleCleanup because the state is {} 
now", brokerId, channelState);
+                return;
+            }
             cleanupJobs.computeIfAbsent(broker, k -> {
                 Executor delayed = CompletableFuture
                         .delayedExecutor(delayInSecs, TimeUnit.SECONDS, 
pulsar.getLoadManagerExecutor());
@@ -1393,6 +1407,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
                 if (data.state() == Owned && broker.equals(data.dstBroker())) {
                     cleaned = false;
+                    log.info("[{}] bundle {} is still owned by this, data: 
{}", broker, serviceUnit, data);
                     break;
                 }
             }
@@ -1400,10 +1415,15 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 break;
             } else {
                 try {
-                    
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
+                    
tableview.flush(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS / 2);
+                    Thread.sleep(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS / 
2);
                 } catch (InterruptedException e) {
                     log.warn("Interrupted while delaying the next service unit 
clean-up. Cleaning broker:{}",
                             brokerId);
+                } catch (ExecutionException e) {
+                    log.error("Failed to flush table view", e.getCause());
+                } catch (TimeoutException e) {
+                    log.warn("Failed to flush the table view in {} ms", 
OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
                 }
             }
         }
@@ -1428,6 +1448,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
+        try {
+            tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
+        } catch (Exception e) {
+            log.error("Failed to flush", e);
+        }
         Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new 
HashMap<>();
         for (var etr : tableview.entrySet()) {
             var stateData = etr.getValue();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index e85134e6116..4a990ddbc9b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -34,7 +34,7 @@ public record ServiceUnitStateData(
 
     public ServiceUnitStateData {
         Objects.requireNonNull(state);
-        if (StringUtils.isBlank(dstBroker) && 
StringUtils.isBlank(sourceBroker)) {
+        if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker) 
&& StringUtils.isBlank(sourceBroker)) {
             throw new IllegalArgumentException("Empty broker");
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
index b1dbb6fac87..3e43237f4c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
@@ -145,4 +145,4 @@ public class ServiceUnitStateDataConflictResolver 
implements TopicCompactionStra
                 || !from.dstBroker().equals(to.sourceBroker())
                 || from.dstBroker().equals(to.dstBroker());
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
index 8dfaddcdabc..12cf87445a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -144,8 +145,13 @@ public class ServiceUnitStateTableViewImpl extends 
ServiceUnitStateTableViewBase
                 .sendAsync()
                 .whenComplete((messageId, e) -> {
                     if (e != null) {
-                        log.error("Failed to publish the message: 
serviceUnit:{}, data:{}",
-                                key, value, e);
+                        if (e instanceof 
PulsarClientException.AlreadyClosedException) {
+                            log.info("Skip publishing the message since the 
producer is closed, serviceUnit: {}, data: "
+                                    + "{}", key, value);
+                        } else {
+                            log.error("Failed to publish the message: 
serviceUnit:{}, data:{}",
+                                    key, value, e);
+                        }
                         future.completeExceptionally(e);
                     } else {
                         future.complete(null);
@@ -159,7 +165,14 @@ public class ServiceUnitStateTableViewImpl extends 
ServiceUnitStateTableViewBase
         if (!isValidState()) {
             throw new IllegalStateException(INVALID_STATE_ERROR_MSG);
         }
-        producer.flushAsync().get(waitDurationInMillis, MILLISECONDS);
+        final var deadline = System.currentTimeMillis() + waitDurationInMillis;
+        var waitTimeMs = waitDurationInMillis;
+        producer.flushAsync().get(waitTimeMs, MILLISECONDS);
+        waitTimeMs = deadline - System.currentTimeMillis();
+        if (waitTimeMs < 0) {
+            waitTimeMs = 0;
+        }
+        tableview.refreshAsync().get(waitTimeMs, MILLISECONDS);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index 48213c18e63..9863d05ee75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -41,7 +41,12 @@ public class BrokerMaxTopicCountFilter implements 
BrokerFilter {
                                                                         
LoadManagerContext context) {
         int loadBalancerBrokerMaxTopics = 
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
         brokers.keySet().removeIf(broker -> {
-            Optional<BrokerLoadData> brokerLoadDataOpt = 
context.brokerLoadDataStore().get(broker);
+            final Optional<BrokerLoadData> brokerLoadDataOpt;
+            try {
+                brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
+            } catch (IllegalStateException ignored) {
+                return false;
+            }
             long topics = 
brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L);
             // TODO: The broker load data might be delayed, so the max topic 
check might not accurate.
             return topics >= loadBalancerBrokerMaxTopics;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index c9d18676cfa..3ce44a1e65a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -92,7 +92,11 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
     public synchronized Optional<T> get(String key) {
         String msg = validateTableView();
         if (StringUtils.isNotBlank(msg)) {
-            throw new IllegalStateException(msg);
+            if (msg.equals(SHUTDOWN_ERR_MSG)) {
+                return Optional.empty();
+            } else {
+                throw new IllegalStateException(msg);
+            }
         }
         return Optional.ofNullable(tableView.get(key));
     }
@@ -193,7 +197,9 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
 
     @Override
     public synchronized void close() throws IOException {
-        validateState();
+        if (isShutdown) {
+            return;
+        }
         closeProducer();
         closeTableView();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 09f04d878c4..bfa99eedcad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -973,7 +973,12 @@ public class BrokerService implements Closeable {
                             
pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS,
                                     
closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS);
                         } catch (Exception e) {
-                            log.warn("Failed to unload namespace bundle {}", 
su, e);
+                            if (e instanceof ExecutionException
+                                    && e.getCause() instanceof 
ServiceUnitNotReadyException) {
+                                log.warn("Failed to unload namespace bundle 
{}: {}", su, e.getMessage());
+                            } else {
+                                log.warn("Failed to unload namespace bundle 
{}", su, e);
+                            }
                         }
                     }
                 });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 18b4c610a5c..6ff6408916b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -254,7 +254,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         // initialization by calling this method. At the moment, the load 
manager does not start so the lookup
         // for "__change_events" will fail. In this case, just return an empty 
policies to avoid deadlock.
         final var loadManager = pulsarService.getLoadManager().get();
-        if (loadManager == null || !loadManager.started()) {
+        if (loadManager == null || !loadManager.started() || closed.get()) {
             return CompletableFuture.completedFuture(Optional.empty());
         }
         final CompletableFuture<Boolean> preparedFuture = 
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
@@ -308,6 +308,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @VisibleForTesting
     @Nonnull CompletableFuture<Boolean> prepareInitPoliciesCacheAsync(@Nonnull 
NamespaceName namespace) {
         requireNonNull(namespace);
+        if (closed.get()) {
+            return CompletableFuture.completedFuture(false);
+        }
         return 
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
                         .thenCompose(namespacePolicies -> {
                             if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
@@ -331,6 +334,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                         });
                                 initFuture.exceptionally(ex -> {
                                     try {
+                                        if (closed.get()) {
+                                            return null;
+                                        }
                                         log.error("[{}] Failed to create 
reader on __change_events topic",
                                                 namespace, ex);
                                         cleanCacheAndCloseReader(namespace, 
false);
@@ -681,14 +687,22 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         if (closed.compareAndSet(false, true)) {
             writerCaches.synchronous().invalidateAll();
             readerCaches.values().forEach(future -> {
-                if (future != null && !future.isCompletedExceptionally()) {
-                    future.thenAccept(reader -> {
-                        try {
-                            reader.close();
-                        } catch (Exception e) {
-                            log.error("Failed to close reader.", e);
-                        }
-                    });
+                try {
+                    final var reader = future.getNow(null);
+                    if (reader != null) {
+                        reader.close();
+                        log.info("Closed the reader for topic policies");
+                    } else {
+                        // Avoid blocking the thread that the reader is created
+                        
future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) -> 
{
+                            if (e == null) {
+                                log.info("Closed the reader for topic 
policies");
+                            } else {
+                                log.error("Failed to close the reader for 
topic policies", e);
+                            }
+                        });
+                    }
+                } catch (Throwable ignored) {
                 }
             });
             readerCaches.clear();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index 41413f3e3a9..fa63ce566c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -22,13 +22,15 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -36,26 +38,33 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
+@Test(groups = "broker")
 public class ExtensibleLoadManagerCloseTest {
 
     private static final String clusterName = "test";
-    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 
0, () -> 0);
     private final List<PulsarService> brokers = new ArrayList<>();
-    private PulsarAdmin admin;
+    private LocalBookkeeperEnsemble bk;
 
     @BeforeClass(alwaysRun = true)
     public void setup() throws Exception {
+        bk = new LocalBookkeeperEnsemble(1, 0, () -> 0);
         bk.start();
-        for (int i = 0; i < 3; i++) {
+    }
+
+    private void setupBrokers(int numBrokers) throws Exception {
+        brokers.clear();
+        for (int i = 0; i < numBrokers; i++) {
             final var broker = new PulsarService(brokerConfig());
             broker.start();
             brokers.add(broker);
         }
-        admin = brokers.get(0).getAdminClient();
-        admin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
-        admin.tenants().createTenant("public", TenantInfo.builder()
-                .allowedClusters(Collections.singleton(clusterName)).build());
-        admin.namespaces().createNamespace("public/default");
+        final var admin = brokers.get(0).getAdminClient();
+        if (!admin.clusters().getClusters().contains(clusterName)) {
+            admin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
+            admin.tenants().createTenant("public", TenantInfo.builder()
+                    
.allowedClusters(Collections.singleton(clusterName)).build());
+            admin.namespaces().createNamespace("public/default");
+        }
     }
 
 
@@ -85,7 +94,9 @@ public class ExtensibleLoadManagerCloseTest {
 
     @Test
     public void testCloseAfterLoadingBundles() throws Exception {
+        setupBrokers(3);
         final var topic = "test";
+        final var admin = brokers.get(0).getAdminClient();
         admin.topics().createPartitionedTopic(topic, 20);
         admin.lookups().lookupPartitionedTopic(topic);
         final var client = 
PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build();
@@ -104,4 +115,25 @@ public class ExtensibleLoadManagerCloseTest {
             Assert.assertTrue(closeTimeMs < 5000L);
         }
     }
+
+    @Test
+    public void testLookup() throws Exception {
+        setupBrokers(1);
+        final var topic = "test-lookup";
+        final var numPartitions = 16;
+        final var admin = brokers.get(0).getAdminClient();
+        admin.topics().createPartitionedTopic(topic, numPartitions);
+
+        final var futures = new ArrayList<CompletableFuture<String>>();
+        for (int i = 0; i < numPartitions; i++) {
+            futures.add(admin.lookups().lookupTopicAsync(topic + 
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        final var start = System.currentTimeMillis();
+        brokers.get(0).close();
+        final var closeTimeMs = System.currentTimeMillis() - start;
+        log.info("Broker close time: {}", closeTimeMs);
+        Assert.assertTrue(closeTimeMs < 5000L);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 3267e67ad2c..820307637be 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -198,7 +198,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
 
         Assert.assertTrue(loadDataStore.pushAsync("2", 
2).isCompletedExceptionally());
         
Assert.assertTrue(loadDataStore.removeAsync("2").isCompletedExceptionally());
-        assertThrows(IllegalStateException.class, () -> 
loadDataStore.get("2"));
+        assertTrue(loadDataStore.get("2").isEmpty());
         assertThrows(IllegalStateException.class, loadDataStore::size);
         assertThrows(IllegalStateException.class, loadDataStore::entrySet);
         assertThrows(IllegalStateException.class, () -> 
loadDataStore.forEach((k, v) -> {}));
@@ -206,7 +206,6 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         assertThrows(IllegalStateException.class, loadDataStore::start);
         assertThrows(IllegalStateException.class, 
loadDataStore::startProducer);
         assertThrows(IllegalStateException.class, 
loadDataStore::startTableView);
-        assertThrows(IllegalStateException.class, loadDataStore::close);
         assertThrows(IllegalStateException.class, 
loadDataStore::closeTableView);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index d5d4174ee10..4f520604978 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -364,8 +364,8 @@ public class TableViewImpl<T> implements TableView<T> {
                                   }
                                }).exceptionally(ex -> {
                                    if (ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
-                                       log.error("Reader {} was closed while 
reading existing messages.",
-                                               reader.getTopic(), ex);
+                                       log.info("Reader {} was closed while 
reading existing messages.",
+                                               reader.getTopic());
                                    } else {
                                        log.warn("Reader {} was interrupted 
while reading existing messages. ",
                                                reader.getTopic(), ex);
@@ -393,8 +393,7 @@ public class TableViewImpl<T> implements TableView<T> {
                     readTailMessages(reader);
                 }).exceptionally(ex -> {
                     if (ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
-                        log.error("Reader {} was closed while reading tail 
messages.",
-                                reader.getTopic(), ex);
+                        log.info("Reader {} was closed while reading tail 
messages.", reader.getTopic());
                         // Fail all refresh request when no more messages can 
be read.
                         pendingRefreshRequests.keySet().forEach(future -> {
                             pendingRefreshRequests.remove(future);


Reply via email to