This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d42dacb2b2f [fix][broker] Fix unloadNamespaceBundlesGracefully can be 
stuck with extensible load manager  (#23349) (#23496)
d42dacb2b2f is described below

commit d42dacb2b2fab717ab83527ce38d8f5590391532
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Oct 23 04:25:22 2024 -0700

    [fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with 
extensible load manager  (#23349) (#23496)
    
    Co-authored-by: Yunze Xu <[email protected]>
---
 .../org/apache/pulsar/broker/PulsarService.java    |   7 +-
 .../extensions/ExtensibleLoadManagerImpl.java      | 132 +++++++++++++-----
 .../extensions/ExtensibleLoadManagerWrapper.java   |   4 +
 .../channel/ServiceUnitStateChannel.java           |   5 +
 .../channel/ServiceUnitStateChannelImpl.java       |  54 ++++++--
 .../ServiceUnitStateCompactionStrategy.java        |   2 +-
 .../extensions/channel/ServiceUnitStateData.java   |   2 +-
 .../filter/BrokerMaxTopicCountFilter.java          |   7 +-
 .../extensions/store/LoadDataStore.java            |   8 +-
 .../store/TableViewLoadDataStoreImpl.java          | 148 +++++++++++++++------
 .../pulsar/broker/service/BrokerService.java       |   7 +-
 .../SystemTopicBasedTopicPoliciesService.java      |  30 +++--
 .../extensions/ExtensibleLoadManagerCloseTest.java |  50 +++++--
 .../channel/ServiceUnitStateChannelTest.java       |   7 +-
 .../extensions/store/LoadDataStoreTest.java        |  60 +++++++--
 .../apache/pulsar/client/impl/TableViewImpl.java   |   7 +-
 16 files changed, 404 insertions(+), 126 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 559db301158..7698f3d119c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -463,6 +463,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 return closeFuture;
             }
             LOG.info("Closing PulsarService");
+            if (topicPoliciesService != null) {
+                topicPoliciesService.close();
+            }
             if (brokerService != null) {
                 brokerService.unloadNamespaceBundlesGracefully();
             }
@@ -578,10 +581,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 transactionBufferClient.close();
             }
 
-            if (topicPoliciesService != null) {
-                topicPoliciesService.close();
-                topicPoliciesService = null;
-            }
 
             if (client != null) {
                 client.close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 8e34f2f697f..bf98df2c5ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -182,7 +182,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     private SplitManager splitManager;
 
-    private volatile boolean started = false;
+    enum State {
+        INIT,
+        RUNNING,
+        // It's removing visibility of the current broker from other brokers. 
In this state, it cannot play as a leader
+        // or follower.
+        DISABLED,
+    }
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.INIT);
 
     private boolean configuredSystemTopics = false;
 
@@ -210,7 +217,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
      * Get all the bundles that are owned by this broker.
      */
     public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() 
{
-        if (!started) {
+        if (state.get() == State.INIT) {
             log.warn("Failed to get owned service units, load manager is not 
started.");
             return CompletableFuture.completedFuture(Collections.emptySet());
         }
@@ -373,7 +380,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     @Override
     public void start() throws PulsarServerException {
-        if (this.started) {
+        if (state.get() != State.INIT) {
             return;
         }
         try {
@@ -471,7 +478,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
                     this.splitScheduler.start();
                     this.initWaiter.complete(true);
-                    this.started = true;
+                    if (!state.compareAndSet(State.INIT, State.RUNNING)) {
+                        failForUnexpectedState("start");
+                    }
                     log.info("Started load manager.");
                 } catch (Throwable e) {
                     failStarting(e);
@@ -643,21 +652,17 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                                 filter.filterAsync(availableBrokerCandidates, 
bundle, context);
                         futures.add(future);
                     }
-                    CompletableFuture<Optional<String>> result = new 
CompletableFuture<>();
-                    FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
-                        if (ex != null) {
-                            // TODO: We may need to revisit this error case.
-                            log.error("Failed to filter out brokers when 
select bundle: {}", bundle, ex);
-                        }
+                    return FutureUtil.waitForAll(futures).exceptionally(e -> {
+                        // TODO: We may need to revisit this error case.
+                        log.error("Failed to filter out brokers when select 
bundle: {}", bundle, e);
+                        return null;
+                    }).thenApply(__ -> {
                         if (availableBrokerCandidates.isEmpty()) {
-                            result.complete(Optional.empty());
-                            return;
+                            return Optional.empty();
                         }
                         Set<String> candidateBrokers = 
availableBrokerCandidates.keySet();
-
-                        
result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle, 
context));
+                        return 
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
                     });
-                    return result;
                 });
     }
 
@@ -695,6 +700,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                                                               boolean force,
                                                               long timeout,
                                                               TimeUnit 
timeoutUnit) {
+        if (state.get() == State.INIT) {
+            return CompletableFuture.completedFuture(null);
+        }
         if 
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
 {
             log.info("Skip unloading namespace bundle: {}.", bundle);
             return CompletableFuture.completedFuture(null);
@@ -783,28 +791,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     @Override
     public void close() throws PulsarServerException {
-        if (!this.started) {
+        if (state.get() == State.INIT) {
             return;
         }
         try {
-            if (brokerLoadDataReportTask != null) {
-                brokerLoadDataReportTask.cancel(true);
-            }
-
-            if (topBundlesLoadDataReportTask != null) {
-                topBundlesLoadDataReportTask.cancel(true);
-            }
-
-            if (monitorTask != null) {
-                monitorTask.cancel(true);
-            }
-
-            this.brokerLoadDataStore.close();
-            this.topBundlesLoadDataStore.close();
+            stopLoadDataReportTasks();
             this.unloadScheduler.close();
             this.splitScheduler.close();
-        } catch (IOException ex) {
-            throw new PulsarServerException(ex);
         } finally {
             try {
                 this.brokerRegistry.close();
@@ -818,7 +811,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                     } catch (Exception e) {
                         throw new PulsarServerException(e);
                     } finally {
-                        this.started = false;
+                        state.set(State.INIT);
                     }
                 }
 
@@ -826,6 +819,28 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         }
     }
 
+    private void stopLoadDataReportTasks() {
+        if (brokerLoadDataReportTask != null) {
+            brokerLoadDataReportTask.cancel(true);
+        }
+        if (topBundlesLoadDataReportTask != null) {
+            topBundlesLoadDataReportTask.cancel(true);
+        }
+        if (monitorTask != null) {
+            monitorTask.cancel(true);
+        }
+        try {
+            brokerLoadDataStore.shutdown();
+        } catch (IOException e) {
+            log.warn("Failed to shutdown brokerLoadDataStore", e);
+        }
+        try {
+            topBundlesLoadDataStore.shutdown();
+        } catch (IOException e) {
+            log.warn("Failed to shutdown topBundlesLoadDataStore", e);
+        }
+    }
+
     public static boolean isInternalTopic(String topic) {
         return INTERNAL_TOPICS.contains(topic)
                 || topic.startsWith(TOPIC)
@@ -841,13 +856,16 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameFollower = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                if (!initWaiter.get()) {
+                if (!initWaiter.get() || disabled()) {
                     return;
                 }
                 if (!serviceUnitStateChannel.isChannelOwner()) {
                     becameFollower = true;
                     break;
                 }
+                if (disabled()) {
+                    return;
+                }
                 // Confirm the system topics have been created or create them 
if they do not exist.
                 // If the leader has changed, the new leader need to reset
                 // the local brokerService.topics (by this topic creations).
@@ -859,6 +877,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 serviceUnitStateChannel.scheduleOwnershipMonitor();
                 break;
             } catch (Throwable e) {
+                if (disabled()) {
+                    log.warn("The broker:{} failed to set the role but exit 
because it's disabled",
+                            pulsar.getBrokerId(), e);
+                    return;
+                }
                 log.warn("The broker:{} failed to set the role. Retrying {} th 
...",
                         pulsar.getBrokerId(), ++retry, e);
                 try {
@@ -870,6 +893,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 }
             }
         }
+        if (disabled()) {
+            return;
+        }
 
         if (becameFollower) {
             log.warn("The broker:{} became follower while initializing leader 
role.", pulsar.getBrokerId());
@@ -893,13 +919,16 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         boolean becameLeader = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
-                if (!initWaiter.get()) {
+                if (!initWaiter.get() || disabled()) {
                     return;
                 }
                 if (serviceUnitStateChannel.isChannelOwner()) {
                     becameLeader = true;
                     break;
                 }
+                if (disabled()) {
+                    return;
+                }
                 unloadScheduler.close();
                 serviceUnitStateChannel.cancelOwnershipMonitor();
                 closeInternalTopics();
@@ -908,6 +937,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 topBundlesLoadDataStore.startProducer();
                 break;
             } catch (Throwable e) {
+                if (disabled()) {
+                    log.warn("The broker:{} failed to set the role but exit 
because it's disabled",
+                            pulsar.getBrokerId(), e);
+                    return;
+                }
                 log.warn("The broker:{} failed to set the role. Retrying {} th 
...",
                         pulsar.getBrokerId(), ++retry, e);
                 try {
@@ -919,6 +953,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 }
             }
         }
+        if (disabled()) {
+            return;
+        }
 
         if (becameLeader) {
             log.warn("This broker:{} became leader while initializing follower 
role.", pulsar.getBrokerId());
@@ -997,9 +1034,20 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     }
 
     public void disableBroker() throws Exception {
+        // TopicDoesNotExistException might be thrown and it's not 
recoverable. Enable this flag to exit playFollower()
+        // or playLeader() quickly.
+        if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
+            failForUnexpectedState("disableBroker");
+        }
+        stopLoadDataReportTasks();
         serviceUnitStateChannel.cleanOwnerships();
-        leaderElectionService.close();
         brokerRegistry.unregister();
+        leaderElectionService.close();
+        final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
+                .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        if (availableBrokers.isEmpty()) {
+            close();
+        }
         // Close the internal topics (if owned any) after giving up the 
possible leader role,
         // so that the subsequent lookups could hit the next leader.
         closeInternalTopics();
@@ -1033,4 +1081,16 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     protected ServiceUnitStateChannel 
createServiceUnitStateChannel(PulsarService pulsar) {
         return new ServiceUnitStateChannelImpl(pulsar);
     }
+
+    private void failForUnexpectedState(String msg) {
+        throw new IllegalStateException("Failed to " + msg + ", state: " + 
state.get());
+    }
+
+    boolean running() {
+        return state.get() == State.RUNNING;
+    }
+
+    private boolean disabled() {
+        return state.get() == State.DISABLED;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 25eb27bc58d..35f6cfcbcf5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -50,6 +50,10 @@ public class ExtensibleLoadManagerWrapper implements 
LoadManager {
         loadManager.start();
     }
 
+    public boolean started() {
+        return loadManager.running() && 
loadManager.getServiceUnitStateChannel().started();
+    }
+
     @Override
     public void initialize(PulsarService pulsar) {
         loadManager.initialize(pulsar);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 9be76e1b0f4..6319fc332a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -43,6 +43,11 @@ public interface ServiceUnitStateChannel extends Closeable {
      */
     void start() throws PulsarServerException;
 
+    /**
+     * Whether the channel started.
+     */
+    boolean started();
+
     /**
      * Closes the ServiceUnitStateChannel.
      * @throws PulsarServerException if it fails to close the channel.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 1a4b6eb3dfb..1f2715a00ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -32,6 +32,7 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isInFlightState;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Closed;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Constructed;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Disabled;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Started;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
@@ -181,7 +182,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         Closed(0),
         Constructed(1),
         LeaderElectionServiceStarted(2),
-        Started(3);
+        Started(3),
+        Disabled(4);
 
         ChannelState(int id) {
             this.id = id;
@@ -260,11 +262,19 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+
+
     @Override
     public void cleanOwnerships() {
+        disable();
         doCleanup(brokerId);
     }
 
+    @Override
+    public synchronized boolean started() {
+        return validateChannelState(Started, true);
+    }
+
     public synchronized void start() throws PulsarServerException {
         if (!validateChannelState(LeaderElectionServiceStarted, false)) {
             throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
@@ -442,9 +452,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             if (owner.isPresent()) {
                 return isTargetBroker(owner.get());
             } else {
-                String msg = "There is no channel owner now.";
-                log.error(msg);
-                throw new IllegalStateException(msg);
+                throw new IllegalStateException("There is no channel owner 
now.");
             }
         });
     }
@@ -618,7 +626,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     public CompletableFuture<String> publishAssignEventAsync(String 
serviceUnit, String broker) {
-        if (!validateChannelState(Started, true)) {
+        if (!validateChannelState(Started, true) || channelState == Disabled) {
             return CompletableFuture.failedFuture(
                     new IllegalStateException("Invalid channel state:" + 
channelState.name()));
         }
@@ -699,6 +707,14 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         ServiceUnitState state = state(data);
+        if (channelState == Disabled && (data == null || !data.force())) {
+            final var request = getOwnerRequests.remove(serviceUnit);
+            if (request != null) {
+                request.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(
+                        "cancel the lookup request for " + serviceUnit + " 
when receiving " + state));
+            }
+            return;
+        }
         try {
             switch (state) {
                 case Owned -> handleOwnEvent(serviceUnit, data);
@@ -866,7 +882,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
-    private void handleFreeEvent(String serviceUnit, ServiceUnitStateData 
data) {
+    private CompletableFuture<Void> handleFreeEvent(String serviceUnit, 
ServiceUnitStateData data) {
         var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(null);
@@ -880,8 +896,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                             : 
CompletableFuture.completedFuture(0)).thenApply(__ -> null);
             stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+            return future;
         } else {
             stateChangeListeners.notify(serviceUnit, data, null);
+            return CompletableFuture.completedFuture(null);
         }
     }
 
@@ -1258,8 +1276,17 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private void handleBrokerDeletionEvent(String broker) {
-        if (!isChannelOwner()) {
-            log.warn("This broker is not the leader now. Ignoring 
BrokerDeletionEvent for broker {}.", broker);
+        try {
+            if (!isChannelOwner()) {
+                log.warn("This broker is not the leader now. Ignoring 
BrokerDeletionEvent for broker {}.", broker);
+                return;
+            }
+        } catch (Exception e) {
+            if (e instanceof ExecutionException && e.getCause() instanceof 
IllegalStateException) {
+                log.warn("Failed to handle broker deletion event due to {}", 
e.getMessage());
+            } else {
+                log.error("Failed to handle broker deletion event.", e);
+            }
             return;
         }
         MetadataState state = getMetadataState();
@@ -1279,6 +1306,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private void scheduleCleanup(String broker, long delayInSecs) {
         var scheduled = new MutableObject<CompletableFuture<Void>>();
         try {
+            final var channelState = this.channelState;
+            if (channelState == Disabled || channelState == Closed) {
+                log.warn("[{}] Skip scheduleCleanup because the state is {} 
now", brokerId, channelState);
+                return;
+            }
             cleanupJobs.computeIfAbsent(broker, k -> {
                 Executor delayed = CompletableFuture
                         .delayedExecutor(delayInSecs, TimeUnit.SECONDS, 
pulsar.getLoadManagerExecutor());
@@ -1371,6 +1403,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
                 if (data.state() == Owned && broker.equals(data.dstBroker())) {
                     cleaned = false;
+                    log.info("[{}] bundle {} is still owned by this, data: 
{}", broker, serviceUnit, data);
                     break;
                 }
             }
@@ -1784,4 +1817,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     public static ServiceUnitStateChannel get(PulsarService pulsar) {
         return 
ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get()).getServiceUnitStateChannel();
     }
+
+        @VisibleForTesting
+        protected void disable() {
+            channelState = Disabled;
+        }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
index 6a98b79be81..7dd1035b220 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -131,4 +131,4 @@ public class ServiceUnitStateCompactionStrategy implements 
TopicCompactionStrate
                 || !from.dstBroker().equals(to.sourceBroker())
                 || from.dstBroker().equals(to.dstBroker());
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index 307d3a4acb1..61a1606ba6e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -34,7 +34,7 @@ public record ServiceUnitStateData(
 
     public ServiceUnitStateData {
         Objects.requireNonNull(state);
-        if (StringUtils.isBlank(dstBroker) && 
StringUtils.isBlank(sourceBroker)) {
+        if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker) 
&& StringUtils.isBlank(sourceBroker)) {
             throw new IllegalArgumentException("Empty broker");
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index 48213c18e63..9863d05ee75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -41,7 +41,12 @@ public class BrokerMaxTopicCountFilter implements 
BrokerFilter {
                                                                         
LoadManagerContext context) {
         int loadBalancerBrokerMaxTopics = 
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
         brokers.keySet().removeIf(broker -> {
-            Optional<BrokerLoadData> brokerLoadDataOpt = 
context.brokerLoadDataStore().get(broker);
+            final Optional<BrokerLoadData> brokerLoadDataOpt;
+            try {
+                brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
+            } catch (IllegalStateException ignored) {
+                return false;
+            }
             long topics = 
brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L);
             // TODO: The broker load data might be delayed, so the max topic 
check might not accurate.
             return topics >= loadBalancerBrokerMaxTopics;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index a7deeeaad8a..157d6a107c5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -103,4 +103,10 @@ public interface LoadDataStore<T> extends Closeable {
      */
     void startProducer() throws LoadDataStoreException;
 
-}
+    /**
+     * Shutdowns the data store.
+     */
+    default void shutdown() throws IOException {
+        close();
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index e9289d3ccda..672a9a66af0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -43,20 +43,17 @@ import org.apache.pulsar.client.api.TableView;
 public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
 
     private static final long 
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
+    private static final String SHUTDOWN_ERR_MSG = "This load store tableview 
has been shutdown";
     private static final long INIT_TIMEOUT_IN_SECS = 5;
-
     private volatile TableView<T> tableView;
     private volatile long tableViewLastUpdateTimestamp;
-
+    private volatile long producerLastPublishTimestamp;
     private volatile Producer<T> producer;
-
     private final ServiceConfiguration conf;
-
     private final PulsarClient client;
-
     private final String topic;
-
     private final Class<T> clazz;
+    private volatile boolean isShutdown;
 
     public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, 
Class<T> clazz)
             throws LoadDataStoreException {
@@ -65,6 +62,7 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
             this.client = pulsar.getClient();
             this.topic = topic;
             this.clazz = clazz;
+            this.isShutdown = false;
         } catch (Exception e) {
             throw new LoadDataStoreException(e);
         }
@@ -72,41 +70,80 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
 
     @Override
     public synchronized CompletableFuture<Void> pushAsync(String key, T 
loadData) {
-        validateProducer();
-        return 
producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
+        String msg = validateProducer();
+        if (StringUtils.isNotBlank(msg)) {
+            return CompletableFuture.failedFuture(new 
IllegalStateException(msg));
+        }
+        return producer.newMessage().key(key).value(loadData).sendAsync()
+                .thenAccept(__ -> producerLastPublishTimestamp = 
System.currentTimeMillis());
     }
 
     @Override
     public synchronized CompletableFuture<Void> removeAsync(String key) {
-        validateProducer();
-        return 
producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
+        String msg = validateProducer();
+        if (StringUtils.isNotBlank(msg)) {
+            return CompletableFuture.failedFuture(new 
IllegalStateException(msg));
+        }
+        return producer.newMessage().key(key).value(null).sendAsync()
+                .thenAccept(__ -> producerLastPublishTimestamp = 
System.currentTimeMillis());
     }
 
     @Override
     public synchronized Optional<T> get(String key) {
-        validateTableView();
+        String msg = validateTableView();
+        if (StringUtils.isNotBlank(msg)) {
+            if (msg.equals(SHUTDOWN_ERR_MSG)) {
+                return Optional.empty();
+            } else {
+                throw new IllegalStateException(msg);
+            }
+        }
         return Optional.ofNullable(tableView.get(key));
     }
 
     @Override
     public synchronized void forEach(BiConsumer<String, T> action) {
-        validateTableView();
+        String msg = validateTableView();
+        if (StringUtils.isNotBlank(msg)) {
+            throw new IllegalStateException(msg);
+        }
         tableView.forEach(action);
     }
 
     public synchronized Set<Map.Entry<String, T>> entrySet() {
-        validateTableView();
+        String msg = validateTableView();
+        if (StringUtils.isNotBlank(msg)) {
+            throw new IllegalStateException(msg);
+        }
         return tableView.entrySet();
     }
 
     @Override
     public synchronized int size() {
-        validateTableView();
+        String msg = validateTableView();
+        if (StringUtils.isNotBlank(msg)) {
+            throw new IllegalStateException(msg);
+        }
         return tableView.size();
     }
 
+    private void validateState() {
+        if (isShutdown) {
+            throw new IllegalStateException(SHUTDOWN_ERR_MSG);
+        }
+    }
+
+
+    @Override
+    public synchronized void init() throws IOException {
+        validateState();
+        close();
+        start();
+    }
+
     @Override
     public synchronized void closeTableView() throws IOException {
+        validateState();
         if (tableView != null) {
             tableView.close();
             tableView = null;
@@ -115,16 +152,26 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
 
     @Override
     public synchronized void start() throws LoadDataStoreException {
+        validateState();
         startProducer();
         startTableView();
     }
 
+    private synchronized void closeProducer() throws IOException {
+        validateState();
+        if (producer != null) {
+            producer.close();
+            producer = null;
+        }
+    }
     @Override
     public synchronized void startTableView() throws LoadDataStoreException {
+        validateState();
         if (tableView == null) {
             try {
                 tableView = 
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
                         .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+                tableViewLastUpdateTimestamp = System.currentTimeMillis();
                 tableView.forEachAndListen((k, v) ->
                         tableViewLastUpdateTimestamp = 
System.currentTimeMillis());
             } catch (Exception e) {
@@ -133,13 +180,14 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
             }
         }
     }
-
     @Override
     public synchronized void startProducer() throws LoadDataStoreException {
+        validateState();
         if (producer == null) {
             try {
                 producer = 
client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
                         .get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+                producerLastPublishTimestamp = System.currentTimeMillis();
             } catch (Exception e) {
                 producer = null;
                 throw new LoadDataStoreException(e);
@@ -149,38 +197,65 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
 
     @Override
     public synchronized void close() throws IOException {
-        if (producer != null) {
-            producer.close();
-            producer = null;
+        if (isShutdown) {
+            return;
         }
+        closeProducer();
         closeTableView();
     }
 
     @Override
-    public synchronized void init() throws IOException {
+    public synchronized void shutdown() throws IOException {
         close();
-        start();
+        isShutdown = true;
     }
 
-    private void validateProducer() {
-        if (producer == null) {
+    private String validateProducer() {
+        if (isShutdown) {
+            return SHUTDOWN_ERR_MSG;
+        }
+        String restartReason = getRestartReason(producer, 
producerLastPublishTimestamp);
+        if (StringUtils.isNotBlank(restartReason)) {
             try {
+                closeProducer();
                 startProducer();
-                log.info("Restarted producer on {}", topic);
+                log.info("Restarted producer on {}, {}", topic, restartReason);
             } catch (Exception e) {
-                log.error("Failed to restart producer on {}", topic, e);
-                throw new RuntimeException(e);
+                String msg = "Failed to restart producer on " + topic + ", 
restart reason: " + restartReason;
+                log.error(msg, e);
+                return msg;
             }
         }
+        return null;
     }
 
-    private void validateTableView() {
+    private String validateTableView() {
+        if (isShutdown) {
+            return SHUTDOWN_ERR_MSG;
+        }
+        String restartReason = getRestartReason(tableView, 
tableViewLastUpdateTimestamp);
+        if (StringUtils.isNotBlank(restartReason)) {
+            try {
+                closeTableView();
+                startTableView();
+                log.info("Restarted tableview on {}, {}", topic, 
restartReason);
+            } catch (Exception e) {
+                String msg = "Failed to tableview on " + topic + ", restart 
reason: " + restartReason;
+                log.error(msg, e);
+                return msg;
+            }
+        }
+        return null;
+    }
+
+    private String getRestartReason(Object obj, long lastUpdateTimestamp) {
+
         String restartReason = null;
 
-        if (tableView == null) {
-            restartReason = "table view is null";
+        if (obj == null) {
+            restartReason = "object is null";
         } else {
-            long inactiveDuration = System.currentTimeMillis() - 
tableViewLastUpdateTimestamp;
+            long inactiveDuration = System.currentTimeMillis() - 
lastUpdateTimestamp;
             long threshold = 
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes())
                     * 
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART;
             if (inactiveDuration > threshold) {
@@ -189,17 +264,6 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
                         TimeUnit.MILLISECONDS.toSeconds(threshold));
             }
         }
-
-        if (StringUtils.isNotBlank(restartReason)) {
-            tableViewLastUpdateTimestamp = 0;
-            try {
-                closeTableView();
-                startTableView();
-                log.info("Restarted tableview on {}, {}", topic, 
restartReason);
-            } catch (Exception e) {
-                log.error("Failed to restart tableview on {}", topic, e);
-                throw new RuntimeException(e);
-            }
-        }
+        return restartReason;
     }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b4885ffcb6f..cb121b66356 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -978,7 +978,12 @@ public class BrokerService implements Closeable {
                             
pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS,
                                     
closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS);
                         } catch (Exception e) {
-                            log.warn("Failed to unload namespace bundle {}", 
su, e);
+                            if (e instanceof ExecutionException
+                                    && e.getCause() instanceof 
ServiceUnitNotReadyException) {
+                                log.warn("Failed to unload namespace bundle 
{}: {}", su, e.getMessage());
+                            } else {
+                                log.warn("Failed to unload namespace bundle 
{}", su, e);
+                            }
                         }
                     }
                 });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 5156246bb5e..8054d781976 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -349,6 +349,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @VisibleForTesting
     @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull 
NamespaceName namespace) {
         requireNonNull(namespace);
+        if (closed.get()) {
+            return CompletableFuture.completedFuture(null);
+        }
         return 
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
                         .thenCompose(namespacePolicies -> {
                             if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
@@ -372,6 +375,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                         });
                                 initFuture.exceptionally(ex -> {
                                     try {
+                                        if (closed.get()) {
+                                            return null;
+                                        }
                                         log.error("[{}] Failed to create 
reader on __change_events topic",
                                                 namespace, ex);
                                         cleanCacheAndCloseReader(namespace, 
false);
@@ -779,14 +785,22 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         if (closed.compareAndSet(false, true)) {
             writerCaches.synchronous().invalidateAll();
             readerCaches.values().forEach(future -> {
-                if (future != null && !future.isCompletedExceptionally()) {
-                    future.thenAccept(reader -> {
-                        try {
-                            reader.close();
-                        } catch (Exception e) {
-                            log.error("Failed to close reader.", e);
-                        }
-                    });
+                try {
+                    final var reader = future.getNow(null);
+                    if (reader != null) {
+                        reader.close();
+                        log.info("Closed the reader for topic policies");
+                    } else {
+                        // Avoid blocking the thread that the reader is created
+                        
future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) -> 
{
+                            if (e == null) {
+                                log.info("Closed the reader for topic 
policies");
+                            } else {
+                                log.error("Failed to close the reader for 
topic policies", e);
+                            }
+                        });
+                    }
+                } catch (Throwable ignored) {
                 }
             });
             readerCaches.clear();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index 41413f3e3a9..fa63ce566c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -22,13 +22,15 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -36,26 +38,33 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
+@Test(groups = "broker")
 public class ExtensibleLoadManagerCloseTest {
 
     private static final String clusterName = "test";
-    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 
0, () -> 0);
     private final List<PulsarService> brokers = new ArrayList<>();
-    private PulsarAdmin admin;
+    private LocalBookkeeperEnsemble bk;
 
     @BeforeClass(alwaysRun = true)
     public void setup() throws Exception {
+        bk = new LocalBookkeeperEnsemble(1, 0, () -> 0);
         bk.start();
-        for (int i = 0; i < 3; i++) {
+    }
+
+    private void setupBrokers(int numBrokers) throws Exception {
+        brokers.clear();
+        for (int i = 0; i < numBrokers; i++) {
             final var broker = new PulsarService(brokerConfig());
             broker.start();
             brokers.add(broker);
         }
-        admin = brokers.get(0).getAdminClient();
-        admin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
-        admin.tenants().createTenant("public", TenantInfo.builder()
-                .allowedClusters(Collections.singleton(clusterName)).build());
-        admin.namespaces().createNamespace("public/default");
+        final var admin = brokers.get(0).getAdminClient();
+        if (!admin.clusters().getClusters().contains(clusterName)) {
+            admin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
+            admin.tenants().createTenant("public", TenantInfo.builder()
+                    
.allowedClusters(Collections.singleton(clusterName)).build());
+            admin.namespaces().createNamespace("public/default");
+        }
     }
 
 
@@ -85,7 +94,9 @@ public class ExtensibleLoadManagerCloseTest {
 
     @Test
     public void testCloseAfterLoadingBundles() throws Exception {
+        setupBrokers(3);
         final var topic = "test";
+        final var admin = brokers.get(0).getAdminClient();
         admin.topics().createPartitionedTopic(topic, 20);
         admin.lookups().lookupPartitionedTopic(topic);
         final var client = 
PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build();
@@ -104,4 +115,25 @@ public class ExtensibleLoadManagerCloseTest {
             Assert.assertTrue(closeTimeMs < 5000L);
         }
     }
+
+    @Test
+    public void testLookup() throws Exception {
+        setupBrokers(1);
+        final var topic = "test-lookup";
+        final var numPartitions = 16;
+        final var admin = brokers.get(0).getAdminClient();
+        admin.topics().createPartitionedTopic(topic, numPartitions);
+
+        final var futures = new ArrayList<CompletableFuture<String>>();
+        for (int i = 0; i < numPartitions; i++) {
+            futures.add(admin.lookups().lookupTopicAsync(topic + 
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        final var start = System.currentTimeMillis();
+        brokers.get(0).close();
+        final var closeTimeMs = System.currentTimeMillis() - start;
+        log.info("Broker close time: {}", closeTimeMs);
+        Assert.assertTrue(closeTimeMs < 5000L);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index e569f0d32d5..af42649436b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -1551,6 +1551,9 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(Optional.empty(), 
channel2.getOwnerAsync(freeBundle).get());
         
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
         assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());
+        ServiceUnitStateChannel finalLeaderChannel = leaderChannel;
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> getCleanupJobs(finalLeaderChannel).isEmpty());
 
         // clean-up
         FieldUtils.writeDeclaredField(leaderChannel, 
"maxCleanupDelayTimeInSecs", 3 * 60, true);
@@ -1621,8 +1624,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 .monitorOwnerships(List.of(brokerId1, brokerId2, "broker-3"));
 
         ServiceUnitStateChannel finalLeaderChannel = leaderChannel;
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
getCleanupJobs(finalLeaderChannel).isEmpty());
-
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> getCleanupJobs(finalLeaderChannel).isEmpty());
 
         waitUntilNewOwner(channel2, releasingBundle1, brokerId2);
         waitUntilNewOwner(channel2, releasingBundle2, brokerId2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index d25cba2bd1b..6d4e0f00c93 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.store;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
 import static org.testng.AssertJUnit.assertTrue;
 
 import com.google.common.collect.Sets;
@@ -33,6 +37,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -75,8 +80,6 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         @Cleanup
         LoadDataStore<MyClass> loadDataStore =
                 LoadDataStoreFactory.create(pulsar, topic, MyClass.class);
-        loadDataStore.startProducer();
-        loadDataStore.startTableView();
         MyClass myClass1 = new MyClass("1", 1);
         loadDataStore.pushAsync("key1", myClass1).get();
 
@@ -109,8 +112,6 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         @Cleanup
         LoadDataStore<Integer> loadDataStore =
                 LoadDataStoreFactory.create(pulsar, topic, Integer.class);
-        loadDataStore.startProducer();
-        loadDataStore.startTableView();
 
         Map<String, Integer> map = new HashMap<>();
         for (int i = 0; i < 10; i++) {
@@ -134,9 +135,6 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
         LoadDataStore<Integer> loadDataStore =
                 LoadDataStoreFactory.create(pulsar, topic, Integer.class);
-        loadDataStore.startProducer();
-
-        loadDataStore.startTableView();
         loadDataStore.pushAsync("1", 1).get();
         Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.size(), 1));
         assertEquals(loadDataStore.get("1").get(), 1);
@@ -150,6 +148,31 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.get("1").get(), 3));
     }
 
+    @Test
+    public void testProducerRestart() throws Exception {
+        String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+        var loadDataStore =
+                (TableViewLoadDataStoreImpl) 
spy(LoadDataStoreFactory.create(pulsar, topic, Integer.class));
+
+        // happy case
+        loadDataStore.pushAsync("1", 1).get();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.size(), 1));
+        assertEquals(loadDataStore.get("1").get(), 1);
+        verify(loadDataStore, times(1)).startProducer();
+
+        // loadDataStore will restart producer if null.
+        FieldUtils.writeField(loadDataStore, "producer", null, true);
+        loadDataStore.pushAsync("1", 2).get();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.get("1").get(), 2));
+        verify(loadDataStore, times(2)).startProducer();
+
+        // loadDataStore will restart producer if too slow.
+        FieldUtils.writeField(loadDataStore, "producerLastPublishTimestamp", 0 
, true);
+        loadDataStore.pushAsync("1", 3).get();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.get("1").get(), 3));
+        verify(loadDataStore, times(3)).startProducer();
+    }
+
     @Test
     public void testProducerStop() throws Exception {
         String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
@@ -165,4 +188,25 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         loadDataStore.removeAsync("2").get();
     }
 
-}
+    @Test
+    public void testShutdown() throws Exception {
+        String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+        LoadDataStore<Integer> loadDataStore =
+                LoadDataStoreFactory.create(pulsar, topic, Integer.class);
+        loadDataStore.start();
+        loadDataStore.shutdown();
+
+        Assert.assertTrue(loadDataStore.pushAsync("2", 
2).isCompletedExceptionally());
+        
Assert.assertTrue(loadDataStore.removeAsync("2").isCompletedExceptionally());
+        assertTrue(loadDataStore.get("2").isEmpty());
+        assertThrows(IllegalStateException.class, loadDataStore::size);
+        assertThrows(IllegalStateException.class, loadDataStore::entrySet);
+        assertThrows(IllegalStateException.class, () -> 
loadDataStore.forEach((k, v) -> {}));
+        assertThrows(IllegalStateException.class, loadDataStore::init);
+        assertThrows(IllegalStateException.class, loadDataStore::start);
+        assertThrows(IllegalStateException.class, 
loadDataStore::startProducer);
+        assertThrows(IllegalStateException.class, 
loadDataStore::startTableView);
+        assertThrows(IllegalStateException.class, 
loadDataStore::closeTableView);
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index d5d4174ee10..4f520604978 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -364,8 +364,8 @@ public class TableViewImpl<T> implements TableView<T> {
                                   }
                                }).exceptionally(ex -> {
                                    if (ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
-                                       log.error("Reader {} was closed while 
reading existing messages.",
-                                               reader.getTopic(), ex);
+                                       log.info("Reader {} was closed while 
reading existing messages.",
+                                               reader.getTopic());
                                    } else {
                                        log.warn("Reader {} was interrupted 
while reading existing messages. ",
                                                reader.getTopic(), ex);
@@ -393,8 +393,7 @@ public class TableViewImpl<T> implements TableView<T> {
                     readTailMessages(reader);
                 }).exceptionally(ex -> {
                     if (ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
-                        log.error("Reader {} was closed while reading tail 
messages.",
-                                reader.getTopic(), ex);
+                        log.info("Reader {} was closed while reading tail 
messages.", reader.getTopic());
                         // Fail all refresh request when no more messages can 
be read.
                         pendingRefreshRequests.keySet().forEach(future -> {
                             pendingRefreshRequests.remove(future);

Reply via email to