This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cff1238465 [improve][broker] PIP-192 Made only the leader consume 
TopBundlesLoadDataStore (#19730)
8cff1238465 is described below

commit 8cff1238465ca8b1315d8b5b48388d4b61ec74b6
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 15 17:22:12 2023 -0700

    [improve][broker] PIP-192 Made only the leader consume 
TopBundlesLoadDataStore (#19730)
    
    Master Issue: https://github.com/apache/pulsar/issues/16691
    
    ### Motivation
    
    Raising a PR to implement https://github.com/apache/pulsar/issues/16691.
    
    Based on the current design, only the leader needs to consume the topk 
bundle load data.
    
    ### Modifications
    This PR
    - made only the leader consume TopBundlesLoadDataStore
    - moved LeaderElectionService to ExtensibleLoadManager from 
ServiceUnitStateChannel.
---
 .../extensions/ExtensibleLoadManagerImpl.java      | 113 +++++++++++++-
 .../channel/ServiceUnitStateChannel.java           |   9 ++
 .../channel/ServiceUnitStateChannelImpl.java       |  79 +++++-----
 .../extensions/scheduler/UnloadScheduler.java      |  11 +-
 .../extensions/store/LoadDataStore.java            |  12 ++
 .../store/TableViewLoadDataStoreImpl.java          |  46 +++++-
 .../extensions/ExtensibleLoadManagerImplTest.java  | 169 +++++++++++++++++++--
 .../channel/ServiceUnitStateChannelTest.java       |  15 ++
 .../extensions/filter/BrokerFilterTestBase.java    |  11 ++
 .../extensions/scheduler/TransferShedderTest.java  |  21 +++
 .../extensions/store/LoadDataStoreTest.java        |  27 ++++
 .../strategy/LeastResourceUsageWithWeightTest.java |  12 ++
 12 files changed, 465 insertions(+), 60 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 7895f06fd50..716be3718bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,6 +29,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -35,6 +39,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
@@ -70,6 +75,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 
 @Slf4j
 public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@@ -84,6 +90,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             NamespaceName.SYSTEM_NAMESPACE,
             "loadbalancer-top-bundles-load-data").toString();
 
+    private static final long MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS = 200;
+
     private PulsarService pulsar;
 
     private ServiceConfiguration conf;
@@ -102,6 +110,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     private LoadManagerScheduler unloadScheduler;
 
+    @Getter
+    private LeaderElectionService leaderElectionService;
+
     @Getter
     private LoadManagerContext context;
 
@@ -142,7 +153,14 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             lookupRequests = ConcurrentOpenHashMap.<String,
                     CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
             .build();
+    private final CountDownLatch loadStoreInitWaiter = new CountDownLatch(1);
+
+    public enum Role {
+        Leader,
+        Follower
+    }
 
+    private Role role;
 
     /**
      * Life cycle: Constructor -> initialize -> start -> close.
@@ -173,12 +191,24 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             return;
         }
         this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+        this.leaderElectionService = new LeaderElectionService(
+                pulsar.getCoordinationService(), 
pulsar.getSafeWebServiceAddress(),
+                state -> {
+                    pulsar.getLoadManagerExecutor().execute(() -> {
+                        if (state == LeaderElectionState.Leading) {
+                            playLeader();
+                        } else {
+                            playFollower();
+                        }
+                    });
+                });
         this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
         this.brokerRegistry.start();
         this.unloadManager = new UnloadManager();
         this.splitManager = new SplitManager(splitCounter);
         this.serviceUnitStateChannel.listen(unloadManager);
         this.serviceUnitStateChannel.listen(splitManager);
+        this.leaderElectionService.start();
         this.serviceUnitStateChannel.start();
         this.antiAffinityGroupPolicyHelper =
                 new AntiAffinityGroupPolicyHelper(pulsar, 
serviceUnitStateChannel);
@@ -189,8 +219,10 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         try {
             this.brokerLoadDataStore = LoadDataStoreFactory
                     .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, 
BrokerLoadData.class);
+            this.brokerLoadDataStore.startTableView();
             this.topBundlesLoadDataStore = LoadDataStoreFactory
                     .create(pulsar.getClient(), 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+            this.loadStoreInitWaiter.countDown();
         } catch (LoadDataStoreException e) {
             throw new PulsarServerException(e);
         }
@@ -409,8 +441,15 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                     this.serviceUnitStateChannel.close();
                 } finally {
                     this.unloadManager.close();
-                    this.started = false;
+                    try {
+                        this.leaderElectionService.close();
+                    } catch (Exception e) {
+                        throw new PulsarServerException(e);
+                    } finally {
+                        this.started = false;
+                    }
                 }
+
             }
         }
     }
@@ -421,6 +460,78 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
     }
 
+    @VisibleForTesting
+    void playLeader() {
+        if (role != Leader) {
+            log.info("This broker:{} is changing the role from {} to {}",
+                    pulsar.getLookupServiceAddress(), role, Leader);
+            int retry = 0;
+            while (true) {
+                try {
+                    serviceUnitStateChannel.scheduleOwnershipMonitor();
+                    loadStoreInitWaiter.await();
+                    topBundlesLoadDataStore.startTableView();
+                    unloadScheduler.start();
+                    break;
+                } catch (Throwable e) {
+                    log.error("The broker:{} failed to change the role. 
Retrying {} th ...",
+                            pulsar.getLookupServiceAddress(), ++retry, e);
+                    try {
+                        Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+                    } catch (InterruptedException ex) {
+                        log.warn("Interrupted while sleeping.");
+                    }
+                }
+            }
+            role = Leader;
+            log.info("This broker:{} plays the leader now.", 
pulsar.getLookupServiceAddress());
+        }
+
+        // flush the load data when the leader is elected.
+        if (brokerLoadDataReporter != null) {
+            brokerLoadDataReporter.reportAsync(true);
+        }
+        if (topBundleLoadDataReporter != null) {
+            topBundleLoadDataReporter.reportAsync(true);
+        }
+    }
+
+    @VisibleForTesting
+    void playFollower() {
+        if (role != Follower) {
+            log.info("This broker:{} is changing the role from {} to {}",
+                    pulsar.getLookupServiceAddress(), role, Follower);
+            int retry = 0;
+            while (true) {
+                try {
+                    serviceUnitStateChannel.cancelOwnershipMonitor();
+                    loadStoreInitWaiter.await();
+                    topBundlesLoadDataStore.closeTableView();
+                    unloadScheduler.close();
+                    break;
+                } catch (Throwable e) {
+                    log.error("The broker:{} failed to change the role. 
Retrying {} th ...",
+                            pulsar.getLookupServiceAddress(), ++retry, e);
+                    try {
+                        Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+                    } catch (InterruptedException ex) {
+                        log.warn("Interrupted while sleeping.");
+                    }
+                }
+            }
+            role = Follower;
+            log.info("This broker:{} plays a follower now.", 
pulsar.getLookupServiceAddress());
+        }
+
+        // flush the load data when the leader is elected.
+        if (brokerLoadDataReporter != null) {
+            brokerLoadDataReporter.reportAsync(true);
+        }
+        if (topBundleLoadDataReporter != null) {
+            topBundleLoadDataReporter.reportAsync(true);
+        }
+    }
+
     void updateBrokerLoadMetrics(BrokerLoadData loadData) {
         
this.brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 4e92ad791ab..719c72a67b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -172,4 +172,13 @@ public interface ServiceUnitStateChannel extends Closeable 
{
      */
     Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet();
 
+    /**
+     * Schedules ownership monitor to periodically check and correct invalid 
ownership states.
+     */
+    void scheduleOwnershipMonitor();
+
+    /**
+     * Cancels the ownership monitor.
+     */
+    void cancelOwnershipMonitor();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f8686c07f05..791d02649ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -96,7 +96,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
-import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 
 @Slf4j
@@ -119,9 +118,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private final String lookupServiceAddress;
     private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> 
cleanupJobs;
     private final StateChangeListeners stateChangeListeners;
-    private final LeaderElectionService leaderElectionService;
     private BrokerSelectionStrategy brokerSelector;
     private BrokerRegistry brokerRegistry;
+    private LeaderElectionService leaderElectionService;
     private TableView<ServiceUnitStateData> tableview;
     private Producer<ServiceUnitStateData> producer;
     private ScheduledFuture<?> monitorTask;
@@ -205,31 +204,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
         this.maxCleanupDelayTimeInSecs = MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
         this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
-        this.leaderElectionService = new LeaderElectionService(
-                pulsar.getCoordinationService(), 
pulsar.getSafeWebServiceAddress(),
-                state -> {
-                    if (state == LeaderElectionState.Leading) {
-                        log.info("This broker:{} is the leader now.", 
lookupServiceAddress);
-                        this.monitorTask = this.pulsar.getLoadManagerExecutor()
-                                .scheduleWithFixedDelay(() -> {
-                                            try {
-                                                
monitorOwnerships(brokerRegistry.getAvailableBrokersAsync()
-                                                        
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS));
-                                            } catch (Exception e) {
-                                                log.info("Failed to monitor 
the ownerships. will retry..", e);
-                                            }
-                                        },
-                                        ownershipMonitorDelayTimeInSecs, 
ownershipMonitorDelayTimeInSecs, SECONDS);
-                    } else {
-                        log.info("This broker:{} is a follower now.", 
lookupServiceAddress);
-                        if (monitorTask != null) {
-                            monitorTask.cancel(false);
-                            monitorTask = null;
-                            log.info("This previous leader broker:{} stopped 
the channel clean-up monitor",
-                                    lookupServiceAddress);
-                        }
-                    }
-                });
+
         Map<ServiceUnitState, AtomicLong> tmpOwnerLookUpCounters = new 
HashMap<>();
         Map<ServiceUnitState, Counters> tmpHandlerCounters = new HashMap<>();
         Map<EventType, Counters> tmpEventCounters = new HashMap<>();
@@ -246,6 +221,32 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         this.channelState = Constructed;
     }
 
+    public void scheduleOwnershipMonitor() {
+        if (monitorTask == null) {
+            this.monitorTask = this.pulsar.getLoadManagerExecutor()
+                    .scheduleWithFixedDelay(() -> {
+                                try {
+                                    
monitorOwnerships(brokerRegistry.getAvailableBrokersAsync()
+                                            
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS));
+                                } catch (Exception e) {
+                                    log.info("Failed to monitor the 
ownerships. will retry..", e);
+                                }
+                            },
+                            ownershipMonitorDelayTimeInSecs, 
ownershipMonitorDelayTimeInSecs, SECONDS);
+            log.info("This leader broker:{} started the ownership monitor.",
+                    lookupServiceAddress);
+        }
+    }
+
+    public void cancelOwnershipMonitor() {
+        if (monitorTask != null) {
+            monitorTask.cancel(false);
+            monitorTask = null;
+            log.info("This previous leader broker:{} stopped the ownership 
monitor.",
+                    lookupServiceAddress);
+        }
+    }
+
     public synchronized void start() throws PulsarServerException {
         if (!validateChannelState(LeaderElectionServiceStarted, false)) {
             throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
@@ -255,11 +256,15 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         try {
             this.brokerRegistry = getBrokerRegistry();
             
this.brokerRegistry.addListener(this::handleBrokerRegistrationEvent);
-            leaderElectionService.start();
-            this.channelState = LeaderElectionServiceStarted;
-            if (debug) {
-                log.info("Successfully started the channel leader election 
service.");
+            this.leaderElectionService = getLeaderElectionService();
+            var leader = leaderElectionService.readCurrentLeader().get(
+                    MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, 
TimeUnit.SECONDS);
+            if (leader.isPresent()) {
+                log.info("Successfully found the channel leader:{}.", 
leader.get());
+            } else {
+                log.warn("Failed to find the channel leader.");
             }
+            this.channelState = LeaderElectionServiceStarted;
             brokerSelector = getBrokerSelector();
 
             if (producer != null) {
@@ -327,15 +332,17 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         return new LeastResourceUsageWithWeight();
     }
 
+    @VisibleForTesting
+    protected LeaderElectionService getLeaderElectionService() {
+        return ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get())
+                .get().getLeaderElectionService();
+    }
+
     public synchronized void close() throws PulsarServerException {
         channelState = Closed;
         boolean debug = debug();
         try {
-            leaderElectionService.close();
-            if (debug) {
-                log.info("Successfully closed the channel leader election 
service.");
-            }
-
+            leaderElectionService = null;
             if (tableview != null) {
                 tableview.close();
                 tableview = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
index e6460269787..bc3c8eb6a94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
@@ -156,16 +156,19 @@ public class UnloadScheduler implements 
LoadManagerScheduler {
 
     @Override
     public void start() {
-        long loadSheddingInterval = TimeUnit.MINUTES
-                .toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
-        this.task = loadManagerExecutor.scheduleAtFixedRate(
-                this::execute, loadSheddingInterval, loadSheddingInterval, 
TimeUnit.MILLISECONDS);
+        if (this.task == null) {
+            long loadSheddingInterval = TimeUnit.MINUTES
+                    .toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
+            this.task = loadManagerExecutor.scheduleAtFixedRate(
+                    this::execute, loadSheddingInterval, loadSheddingInterval, 
TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override
     public void close() {
         if (this.task != null) {
             this.task.cancel(false);
+            this.task = null;
         }
         this.recentlyUnloadedBundles.clear();
         this.recentlyUnloadedBrokers.clear();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index 512811e1019..680a36523a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.extensions.store;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -74,4 +75,15 @@ public interface LoadDataStore<T> extends Closeable {
      */
     int size();
 
+
+    /**
+     * Closes the table view.
+     */
+    void closeTableView() throws IOException;
+
+    /**
+     * Starts the table view.
+     */
+    void startTableView() throws LoadDataStoreException;
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index 3909de2afa2..a400163ebf1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 
@@ -36,14 +37,22 @@ import org.apache.pulsar.client.api.TableView;
  */
 public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
 
-    private final TableView<T> tableView;
+    private TableView<T> tableView;
 
     private final Producer<T> producer;
 
+    private final PulsarClient client;
+
+    private final String topic;
+
+    private final Class<T> clazz;
+
     public TableViewLoadDataStoreImpl(PulsarClient client, String topic, 
Class<T> clazz) throws LoadDataStoreException {
         try {
-            this.tableView = 
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+            this.client = client;
             this.producer = 
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
+            this.topic = topic;
+            this.clazz = clazz;
         } catch (Exception e) {
             throw new LoadDataStoreException(e);
         }
@@ -61,30 +70,59 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
 
     @Override
     public Optional<T> get(String key) {
+        validateTableViewStart();
         return Optional.ofNullable(tableView.get(key));
     }
 
     @Override
     public void forEach(BiConsumer<String, T> action) {
+        validateTableViewStart();
         tableView.forEach(action);
     }
 
     public Set<Map.Entry<String, T>> entrySet() {
+        validateTableViewStart();
         return tableView.entrySet();
     }
 
     @Override
     public int size() {
+        validateTableViewStart();
         return tableView.size();
     }
 
+    @Override
+    public void closeTableView() throws IOException {
+        if (tableView != null) {
+            tableView.close();
+            tableView = null;
+        }
+    }
+
+    @Override
+    public void startTableView() throws LoadDataStoreException {
+        if (tableView == null) {
+            try {
+                tableView = 
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+            } catch (PulsarClientException e) {
+                tableView = null;
+                throw new LoadDataStoreException(e);
+            }
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (producer != null) {
             producer.close();
         }
-        if (tableView != null) {
-            tableView.close();
+        closeTableView();
+    }
+
+    private void validateTableViewStart() {
+        if (tableView == null) {
+            throw new IllegalStateException("table view has not been started");
         }
     }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 17fcb995261..aa8583c6b57 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -35,6 +35,7 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
@@ -42,6 +43,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -49,6 +52,8 @@ import com.google.common.collect.Sets;
 import java.util.LinkedHashMap;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -74,12 +79,14 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateC
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -100,6 +107,7 @@ import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -143,22 +151,9 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
         pulsar2 = additionalPulsarTestContext.getPulsarService();
 
-        ExtensibleLoadManagerWrapper primaryLoadManagerWrapper =
-                (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
-        primaryLoadManager = spy((ExtensibleLoadManagerImpl)
-                FieldUtils.readField(primaryLoadManagerWrapper, "loadManager", 
true));
-        FieldUtils.writeField(primaryLoadManagerWrapper, "loadManager", 
primaryLoadManager, true);
-
-        ExtensibleLoadManagerWrapper secondaryLoadManagerWrapper =
-                (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
-        secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
-                FieldUtils.readField(secondaryLoadManagerWrapper, 
"loadManager", true));
-        FieldUtils.writeField(secondaryLoadManagerWrapper, "loadManager", 
secondaryLoadManager, true);
+        setPrimaryLoadManager();
 
-        channel1 = (ServiceUnitStateChannelImpl)
-                FieldUtils.readField(primaryLoadManager, 
"serviceUnitStateChannel", true);
-        channel2 = (ServiceUnitStateChannelImpl)
-                FieldUtils.readField(secondaryLoadManager, 
"serviceUnitStateChannel", true);
+        setSecondaryLoadManager();
 
         admin.clusters().createCluster(this.conf.getClusterName(),
                 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
@@ -412,6 +407,130 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(brokerLookupData.get().getWebServiceUrl(), 
pulsar2.getWebServiceAddress());
     }
 
+    @Test
+    public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws 
Exception {
+        var topBundlesLoadDataStorePrimary =
+                (LoadDataStore) 
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", 
true);
+        var serviceUnitStateChannelPrimary =
+                (ServiceUnitStateChannelImpl) 
FieldUtils.readDeclaredField(primaryLoadManager,
+                        "serviceUnitStateChannel", true);
+        var tvPrimary =
+                (TableViewImpl) 
FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimary, "tableView", true);
+
+        var topBundlesLoadDataStoreSecondary =
+                (LoadDataStore) 
FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", 
true);
+        var tvSecondary =
+                (TableViewImpl) 
FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView", 
true);
+
+        if (serviceUnitStateChannelPrimary.isChannelOwnerAsync().get(5, 
TimeUnit.SECONDS)) {
+            assertNotNull(tvPrimary);
+            assertNull(tvSecondary);
+        } else {
+            assertNull(tvPrimary);
+            assertNotNull(tvSecondary);
+        }
+
+        restartBroker();
+        pulsar1 = pulsar;
+        setPrimaryLoadManager();
+
+        var serviceUnitStateChannelPrimaryNew =
+                (ServiceUnitStateChannelImpl) 
FieldUtils.readDeclaredField(primaryLoadManager,
+                        "serviceUnitStateChannel", true);
+        var topBundlesLoadDataStorePrimaryNew =
+                (LoadDataStore) 
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore"
+                        , true);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+                    
assertFalse(serviceUnitStateChannelPrimaryNew.isChannelOwnerAsync().get(5, 
TimeUnit.SECONDS));
+                    
assertNotNull(FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, 
"tableView"
+                            , true));
+                    
assertNull(FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimaryNew, 
"tableView"
+                            , true));
+                }
+        );
+    }
+
+    @Test
+    public void testRoleChange()
+            throws Exception {
+
+
+        var topBundlesLoadDataStorePrimary = 
(LoadDataStore<TopBundlesLoadData>)
+                FieldUtils.readDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore", true);
+        var topBundlesLoadDataStorePrimarySpy = 
spy(topBundlesLoadDataStorePrimary);
+        AtomicInteger countPri = new AtomicInteger(3);
+        AtomicInteger countPri2 = new AtomicInteger(3);
+        doAnswer(invocationOnMock -> {
+            if (countPri.decrementAndGet() > 0) {
+                throw new RuntimeException();
+            }
+            // Call the real method
+            reset();
+            return null;
+        }).when(topBundlesLoadDataStorePrimarySpy).startTableView();
+        doAnswer(invocationOnMock -> {
+            if (countPri2.decrementAndGet() > 0) {
+                throw new RuntimeException();
+            }
+            // Call the real method
+            reset();
+            return null;
+        }).when(topBundlesLoadDataStorePrimarySpy).closeTableView();
+        FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimarySpy, true);
+
+        var topBundlesLoadDataStoreSecondary = 
(LoadDataStore<TopBundlesLoadData>)
+                FieldUtils.readDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", true);
+        var topBundlesLoadDataStoreSecondarySpy = 
spy(topBundlesLoadDataStoreSecondary);
+        AtomicInteger countSec = new AtomicInteger(3);
+        AtomicInteger countSec2 = new AtomicInteger(3);
+        doAnswer(invocationOnMock -> {
+            if (countSec.decrementAndGet() > 0) {
+                throw new RuntimeException();
+            }
+            // Call the real method
+            reset();
+            return null;
+        }).when(topBundlesLoadDataStoreSecondarySpy).startTableView();
+        doAnswer(invocationOnMock -> {
+            if (countSec2.decrementAndGet() > 0) {
+                throw new RuntimeException();
+            }
+            // Call the real method
+            reset();
+            return null;
+        }).when(topBundlesLoadDataStoreSecondarySpy).closeTableView();
+        FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);
+
+        if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+            primaryLoadManager.playFollower();
+            primaryLoadManager.playFollower();
+            secondaryLoadManager.playLeader();
+            secondaryLoadManager.playLeader();
+            primaryLoadManager.playLeader();
+            primaryLoadManager.playLeader();
+            secondaryLoadManager.playFollower();
+            secondaryLoadManager.playFollower();
+        } else {
+            primaryLoadManager.playLeader();
+            primaryLoadManager.playLeader();
+            secondaryLoadManager.playFollower();
+            secondaryLoadManager.playFollower();
+            primaryLoadManager.playFollower();
+            primaryLoadManager.playFollower();
+            secondaryLoadManager.playLeader();
+            secondaryLoadManager.playLeader();
+        }
+
+
+        verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
+        verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
+        verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
+        verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
+
+        FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
+        FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
+    }
+
     @Test
     public void testGetMetrics() throws Exception {
         {
@@ -612,6 +731,26 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         cache.clear();
     }
 
+    private void setPrimaryLoadManager() throws IllegalAccessException {
+        ExtensibleLoadManagerWrapper wrapper =
+                (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+        primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+                FieldUtils.readField(wrapper, "loadManager", true));
+        FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, 
true);
+        channel1 = (ServiceUnitStateChannelImpl)
+                FieldUtils.readField(primaryLoadManager, 
"serviceUnitStateChannel", true);
+    }
+
+    private void setSecondaryLoadManager() throws IllegalAccessException {
+        ExtensibleLoadManagerWrapper wrapper =
+                (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+        secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+                FieldUtils.readField(wrapper, "loadManager", true));
+        FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, 
true);
+        channel2 = (ServiceUnitStateChannelImpl)
+                FieldUtils.readField(secondaryLoadManager, 
"serviceUnitStateChannel", true);
+    }
+
     private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService 
pulsar, TopicName topic) {
         return pulsar.getNamespaceService().getBundleAsync(topic);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 64a5f63196b..e0bd69fce64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
@@ -1497,6 +1498,20 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         doReturn(registry).when(channel).getBrokerRegistry();
         doReturn(brokerSelector).when(channel).getBrokerSelector();
 
+
+        var leaderElectionService = new LeaderElectionService(
+                pulsar.getCoordinationService(), 
pulsar.getSafeWebServiceAddress(),
+                state -> {
+                    if (state == LeaderElectionState.Leading) {
+                        channel.scheduleOwnershipMonitor();
+                    } else {
+                        channel.cancelOwnershipMonitor();
+                    }
+                });
+        leaderElectionService.start();
+
+        
doReturn(leaderElectionService).when(channel).getLeaderElectionService();
+
         return channel;
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
index 3de957c3b1a..c521861471c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
@@ -34,6 +34,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 
 public class BrokerFilterTestBase {
@@ -82,6 +83,16 @@ public class BrokerFilterTestBase {
             public int size() {
                 return map.size();
             }
+
+            @Override
+            public void closeTableView() throws IOException {
+
+            }
+
+            @Override
+            public void startTableView() throws LoadDataStoreException {
+
+            }
         };
         configuration.setPreferLaterVersions(true);
         doReturn(configuration).when(mockContext).brokerConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 47bf1ad2e7e..3955f1ed9af 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -68,6 +68,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
 import 
org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
 import 
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
@@ -267,6 +268,16 @@ public class TransferShedderTest {
             public int size() {
                 return map.size();
             }
+
+            @Override
+            public void closeTableView() throws IOException {
+
+            }
+
+            @Override
+            public void startTableView() throws LoadDataStoreException {
+
+            }
         };
 
         var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
@@ -310,6 +321,16 @@ public class TransferShedderTest {
             public int size() {
                 return map.size();
             }
+
+            @Override
+            public void closeTableView() throws IOException {
+
+            }
+
+            @Override
+            public void startTableView() throws LoadDataStoreException {
+
+            }
         };
 
         BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 5e2924cd842..184c337a47c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.store;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.AssertJUnit.assertTrue;
 
 import com.google.common.collect.Sets;
@@ -74,6 +75,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         @Cleanup
         LoadDataStore<MyClass> loadDataStore =
                 LoadDataStoreFactory.create(pulsar.getClient(), topic, 
MyClass.class);
+        loadDataStore.startTableView();
         MyClass myClass1 = new MyClass("1", 1);
         loadDataStore.pushAsync("key1", myClass1).get();
 
@@ -106,6 +108,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         @Cleanup
         LoadDataStore<Integer> loadDataStore =
                 LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+        loadDataStore.startTableView();
 
         Map<String, Integer> map = new HashMap<>();
         for (int i = 0; i < 10; i++) {
@@ -124,4 +127,28 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(loadDataStore.entrySet(), map.entrySet());
     }
 
+    @Test
+    public void testTableViewRestart() throws Exception {
+        String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+        LoadDataStore<Integer> loadDataStore =
+                LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+
+        loadDataStore.startTableView();
+        loadDataStore.pushAsync("1", 1).get();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.size(), 1));
+        assertEquals(loadDataStore.get("1").get(), 1);
+        loadDataStore.closeTableView();
+
+        loadDataStore.pushAsync("1", 2).get();
+        Exception ex = null;
+        try {
+            loadDataStore.get("1");
+        } catch (IllegalStateException e) {
+            ex = e;
+        }
+        assertNotNull(ex);
+        loadDataStore.startTableView();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.get("1").get(), 2));
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index ebc2424cada..0eea1d87513 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicName;
@@ -244,6 +246,16 @@ public class LeastResourceUsageWithWeightTest {
             public int size() {
                 return map.size();
             }
+
+            @Override
+            public void closeTableView() throws IOException {
+
+            }
+
+            @Override
+            public void startTableView() throws LoadDataStoreException {
+
+            }
         };
 
         doReturn(conf).when(ctx).brokerConfiguration();


Reply via email to