This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce102dac329 [fix][broker]Data lost due to conflict loaded up a topic 
for two brokers, when enabled ServiceUnitStateMetadataStoreTableViewImpl 
(#24478)
ce102dac329 is described below

commit ce102dac32978740b8dd7ddb1598ce3263ef6202
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 9 10:12:38 2025 +0800

    [fix][broker]Data lost due to conflict loaded up a topic for two brokers, 
when enabled ServiceUnitStateMetadataStoreTableViewImpl (#24478)
---
 .../channel/ServiceUnitStateChannelImpl.java       |  23 ++-
 ...ServiceUnitStateMetadataStoreTableViewImpl.java |  30 ++-
 .../channel/ServiceUnitStateTableView.java         |  15 +-
 .../channel/ServiceUnitStateTableViewBase.java     |   7 +-
 .../channel/ServiceUnitStateTableViewImpl.java     |   9 +-
 .../channel/ServiceUnitStateTableViewSyncer.java   |   4 +
 .../ExtensibleLoadManagerImplBaseTest.java         |   8 +
 .../channel/ServiceUnitStateChannelTest.java       |  38 +++-
 .../broker/service/NetworkErrorTestBase.java       |  20 ++
 .../pulsar/broker/service/ZkSessionExpireTest.java | 146 ++++++++++++---
 .../tableview/impl/MetadataStoreTableViewImpl.java | 206 ++++++++++++++++-----
 11 files changed, 419 insertions(+), 87 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 3e2a9c7468d..ecd98f188b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -315,7 +315,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                             
pulsar.getConfiguration().getDefaultNumberOfNamespaceBundles());
 
             tableview = createServiceUnitStateTableView();
-            tableview.start(pulsar, this::handleEvent, this::handleExisting);
+            tableview.start(pulsar, this::handleEvent, this::handleExisting, 
this::handleInvalidate);
 
             if (debug) {
                 log.info("Successfully started the channel tableview.");
@@ -465,7 +465,12 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         // we return the owner without its activeness check.
         // This broker tries to serve lookups on a best efforts basis when 
metadata store connection is unstable.
         if (!brokerRegistry.isRegistered()) {
-            return CompletableFuture.completedFuture(owner);
+            if (tableview.isMetadataStoreBased()) {
+                return FutureUtil.failedFuture(new 
MetadataStoreException("broker is unavailable so far because it is"
+                    + " in the state that tries to reconnect to the metadata 
store."));
+            } else {
+                return CompletableFuture.completedFuture(owner);
+            }
         }
 
         return dedupeGetOwnerRequest(serviceUnit)
@@ -736,6 +741,20 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+    /***
+     * When the {@link #tableview} can not determine the ownership of the 
service-unit, this method will be called.
+     * Often happens when the current broker can not connect to others.
+     */
+    private void handleInvalidate(String serviceUnit, ServiceUnitStateData 
data) {
+        closeServiceUnit(serviceUnit, true).whenComplete((__, ex) -> {
+            if (ex == null) {
+                log.info("Unloaded serviceUnit:{} because the ownership is 
invalidate", serviceUnit);
+                return;
+            }
+            log.error("Failed to unload serviceUnit:{} after the ownership is 
invalidate", serviceUnit, ex);
+        });
+    }
+
     private static boolean isTransferCommand(ServiceUnitStateData data) {
         if (data == null) {
             return false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
index f488b31c774..f70fbbb45c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
@@ -31,10 +31,12 @@ import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MetadataSessionExpiredPolicy;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreTableView;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.metadata.tableview.impl.MetadataStoreTableViewImpl;
 
 @Slf4j
@@ -54,13 +56,23 @@ public class ServiceUnitStateMetadataStoreTableViewImpl 
extends ServiceUnitState
     private ServiceUnitStateDataConflictResolver conflictResolver;
     private volatile MetadataStoreTableView<ServiceUnitStateData> tableview;
 
+    @Override
     public void start(PulsarService pulsar,
                       BiConsumer<String, ServiceUnitStateData> 
tailItemListener,
-                      BiConsumer<String, ServiceUnitStateData> 
existingItemListener)
+                      BiConsumer<String, ServiceUnitStateData> 
existingItemListener,
+                      BiConsumer<String, ServiceUnitStateData> 
outdatedItemListeners)
             throws MetadataStoreException {
         init(pulsar);
         conflictResolver = new ServiceUnitStateDataConflictResolver();
         conflictResolver.setStorageType(MetadataStore);
+        if (!(pulsar.getLocalMetadataStore() instanceof AbstractMetadataStore)
+            && 
!MetadataSessionExpiredPolicy.shutdown.equals(pulsar.getConfig().getZookeeperSessionExpiredPolicy()))
 {
+            String errorMsg = String.format("Your current metadata store [%s] 
does not support the registration of "
+                    + "session event listeners. Please set 
\"zookeeperSessionExpiredPolicy\" to \"shutdown\";"
+                    + " otherwise, you will encounter the issue that messages 
lost because of conflicted topic loading",
+                    pulsar.getLocalMetadataStore().getClass().getName());
+            log.warn(errorMsg);
+        }
         tableview = new 
MetadataStoreTableViewImpl<>(ServiceUnitStateData.class,
                 pulsar.getBrokerId(),
                 pulsar.getLocalMetadataStore(),
@@ -69,12 +81,21 @@ public class ServiceUnitStateMetadataStoreTableViewImpl 
extends ServiceUnitState
                 this::validateServiceUnitPath,
                 List.of(this::updateOwnedServiceUnits, tailItemListener),
                 List.of(this::updateOwnedServiceUnits, existingItemListener),
-                
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds())
+                List.of(this::invalidateOwnedServiceUnits, 
outdatedItemListeners),
+                true,
+                
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds()),
+                t -> handleTableViewShutDownEvent(t)
         );
         tableview.start();
 
     }
 
+    protected void handleTableViewShutDownEvent(Throwable throwable) {
+        log.error("The component of load-balance, which named metadata store 
table view has shutdown. This Broker can"
+                    + " not work anymore, start tp shutdow,");
+        pulsar.shutdownNow();
+    }
+
     protected boolean resolveConflict(ServiceUnitStateData prev, 
ServiceUnitStateData cur) {
         return !conflictResolver.shouldKeepLeft(prev, cur);
     }
@@ -131,6 +152,11 @@ public class ServiceUnitStateMetadataStoreTableViewImpl 
extends ServiceUnitState
         // no-op
     }
 
+    @Override
+    public boolean isMetadataStoreBased() {
+        return true;
+    }
+
     @Override
     public CompletableFuture<Void> delete(String key) {
         if (!isValidState()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
index 5ac57fe5c19..e75bbfeb1a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
@@ -47,10 +47,18 @@ public interface ServiceUnitStateTableView extends 
Closeable {
      * @param tailItemListener listener to listen tail(newly updated) items
      * @param existingItemListener listener to listen existing items
      * @throws IOException if it fails to init the tableview.
+     * @param itemOutdatedListeners Let's introduce how to ensure the correct 
element values: 1. The table
+     *      view will try its best to ensure that the data is always the 
latest. When it cannot be guaranteed, see
+     *      the next Article 2. If you get an old value, you will receive an 
update event later, ultimately ensuring
+     *      the accuracy of the data. 3. You will receive this notification 
when the first two cannot be guaranteed
+     *      due to the expiration of the metadata store session of the table 
view。After that, you also received
+     *      notifications {@param tailItemListeners} and {@param 
existingItemListeners}, indicating that the table
+     *      view can once again ensure that the first two can work properly.
      */
     void start(PulsarService pulsar,
                BiConsumer<String, ServiceUnitStateData> tailItemListener,
-               BiConsumer<String, ServiceUnitStateData> existingItemListener) 
throws IOException;
+               BiConsumer<String, ServiceUnitStateData> existingItemListener,
+               BiConsumer<String, ServiceUnitStateData> itemOutdatedListeners) 
throws IOException;
 
 
     /**
@@ -110,4 +118,9 @@ public interface ServiceUnitStateTableView extends 
Closeable {
      * @throws TimeoutException
      */
     void flush(long waitDurationInMillis) throws ExecutionException, 
InterruptedException, TimeoutException;
+
+    /**
+     * Whether it depends on the local metadata store.
+     */
+    boolean isMetadataStoreBased();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
index b690ef101e1..a236de89f0e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
@@ -43,7 +43,7 @@ abstract class ServiceUnitStateTableViewBase implements 
ServiceUnitStateTableVie
     private final Map<NamespaceBundle, Boolean> ownedServiceUnitsMap = new 
ConcurrentHashMap<>();
     private final Set<NamespaceBundle> ownedServiceUnits = 
Collections.unmodifiableSet(ownedServiceUnitsMap.keySet());
     private String brokerId;
-    private PulsarService pulsar;
+    protected PulsarService pulsar;
     protected void init(PulsarService pulsar) throws MetadataStoreException {
         this.pulsar = pulsar;
         this.brokerId = pulsar.getBrokerId();
@@ -89,4 +89,9 @@ abstract class ServiceUnitStateTableViewBase implements 
ServiceUnitStateTableVie
             }
         });
     }
+
+    protected void invalidateOwnedServiceUnits(String key, 
ServiceUnitStateData outdatedVal) {
+        NamespaceBundle namespaceBundle = 
LoadManagerShared.getNamespaceBundle(pulsar, key);
+        ownedServiceUnitsMap.compute(namespaceBundle, (k, v) -> false);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
index 12cf87445a3..d1ac87fdf3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
@@ -51,9 +51,11 @@ public class ServiceUnitStateTableViewImpl extends 
ServiceUnitStateTableViewBase
     private volatile Producer<ServiceUnitStateData> producer;
     private volatile TableView<ServiceUnitStateData> tableview;
 
+    @Override
     public void start(PulsarService pulsar,
                       BiConsumer<String, ServiceUnitStateData> 
tailItemListener,
-                      BiConsumer<String, ServiceUnitStateData> 
existingItemListener) throws IOException {
+                      BiConsumer<String, ServiceUnitStateData> 
existingItemListener,
+                      BiConsumer<String, ServiceUnitStateData> 
itemOutdatedListeners) throws IOException {
         boolean debug = 
ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
 
         init(pulsar);
@@ -175,6 +177,11 @@ public class ServiceUnitStateTableViewImpl extends 
ServiceUnitStateTableViewBase
         tableview.refreshAsync().get(waitTimeMs, MILLISECONDS);
     }
 
+    @Override
+    public boolean isMetadataStoreBased() {
+        return false;
+    }
+
     @Override
     public CompletableFuture<Void> delete(String key) {
         return put(key, null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
index 10ab39a66d2..45ff0dcb267 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
@@ -100,6 +100,7 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
         metadataStoreTableView.start(
                 pulsar,
                 this::dummy,
+                this::dummy,
                 this::dummy
         );
 
@@ -108,6 +109,7 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
         systemTopicTableView.start(
                 pulsar,
                 this::dummy,
+                this::dummy,
                 this::dummy
         );
 
@@ -152,6 +154,7 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
         this.metadataStoreTableView.start(
                 pulsar,
                 this::syncToSystemTopic,
+                this::dummy,
                 this::dummy
         );
         log.info("Started MetadataStoreTableView");
@@ -160,6 +163,7 @@ public class ServiceUnitStateTableViewSyncer implements 
Closeable {
         this.systemTopicTableView.start(
                 pulsar,
                 this::syncToMetadataStore,
+                this::dummy,
                 this::dummy
         );
         log.info("Started SystemTopicTableView");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index bb224cdf7c4..fff5761eb48 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.MetadataSessionExpiredPolicy;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -78,6 +79,13 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
 
     protected String serviceUnitStateTableViewClassName;
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        
conf.setZookeeperSessionExpiredPolicy(MetadataSessionExpiredPolicy.shutdown);
+        return conf;
+    }
+
     protected ArrayList<PulsarClient> clients = new ArrayList<>();
 
     @DataProvider(name = "serviceUnitStateTableViewClassName")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 63d6bbb652a..bbde38bfbec 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreTableView;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -1791,15 +1792,34 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertTrue(ex.getCause() instanceof IllegalStateException);
         assertTrue(System.currentTimeMillis() - start >= 1000);
 
-        try {
-            // verify getOwnerAsync returns immediately when not registered
-            registry.unregister();
-            start = System.currentTimeMillis();
-            assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
-            elapsed = System.currentTimeMillis() - start;
-            assertTrue(elapsed < 1000);
-        } finally {
-            registry.registerAsync().join();
+        if 
(pulsar1.getConfig().getLoadManagerServiceUnitStateTableViewClassName()
+            .equals(ServiceUnitStateTableViewImpl.class.getName())) {
+            try {
+                // verify getOwnerAsync returns immediately when not registered
+                registry.unregister();
+                start = System.currentTimeMillis();
+                assertEquals(broker, 
channel1.getOwnerAsync(bundle).get().get());
+                elapsed = System.currentTimeMillis() - start;
+                assertTrue(elapsed < 1000);
+            } finally {
+                registry.registerAsync().join();
+            }
+        }
+
+        if 
(pulsar1.getConfig().getLoadManagerServiceUnitStateTableViewClassName()
+                
.equals(ServiceUnitStateMetadataStoreTableViewImpl.class.getName())) {
+            try {
+                // verify getOwnerAsync returns immediately when not registered
+                registry.unregister();
+                channel1.getOwnerAsync(bundle).get().get();
+                fail("Request should fail because it is in the state that 
tries to reconnect to the metadata store");
+            } catch (Exception e) {
+                Throwable actEx = FutureUtil.unwrapCompletionException(e);
+                assertTrue(actEx instanceof MetadataStoreException);
+                assertTrue(actEx.getMessage().contains("reconnect to the 
metadata store."));
+            } finally {
+                registry.registerAsync().join();
+            }
         }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
index 0161a4a63cf..4a0c73b4e00 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -23,17 +23,20 @@ import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -301,6 +304,23 @@ public abstract class NetworkErrorTestBase extends 
TestRetrySupport {
         }
     }
 
+    public static Collection<String> getOwnedBundles(PulsarService pulsar) {
+        Object loadManagerWrapper = pulsar.getLoadManager().get();
+        Object loadManager = WhiteboxImpl.getInternalState(loadManagerWrapper, 
"loadManager");
+        if (loadManager instanceof ModularLoadManagerImpl) {
+            return 
pulsar.getNamespaceService().getOwnershipCache().getOwnedBundles()
+                .keySet().stream().map(k -> k.getNamespaceObject().toString() 
+ "/" + k.getBundleRange())
+                .collect(Collectors.toList());
+        } else if (loadManager instanceof ExtensibleLoadManagerImpl 
extensibleLoadManager) {
+            ServiceUnitStateChannel serviceUnitStateChannel = 
extensibleLoadManager.getServiceUnitStateChannel();
+            return serviceUnitStateChannel.getOwnedServiceUnits().stream()
+                .map(k -> k.getNamespaceObject().toString() + "/" + 
k.getBundleRange())
+                .collect(Collectors.toList());
+        } else {
+            throw new RuntimeException("Not support for the load manager: " + 
loadManager.getClass().getName());
+        }
+    }
+
     public void clearPreferBroker() {
         preferBroker.set(null);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
index b05dae2c3c2..339e5a88786 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
@@ -19,22 +19,34 @@
 package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreTableView;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -64,22 +76,45 @@ public class ZkSessionExpireTest extends 
NetworkErrorTestBase {
     @DataProvider(name = "settings")
     public Object[][] settings() {
         return new Object[][]{
-            {false, NetworkErrorTestBase.PreferBrokerModularLoadManager.class},
-            {true, NetworkErrorTestBase.PreferBrokerModularLoadManager.class}
+//            {false, 
NetworkErrorTestBase.PreferBrokerModularLoadManager.class, null},
+//            {true, 
NetworkErrorTestBase.PreferBrokerModularLoadManager.class, null},
+            {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class, 
TableViewType.MetadataStore},
             // Create a separate PR to add this test case.
-            // {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class}.
+            // {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class, 
TableViewType.SystemTopic}
         };
     }
 
-    @Test(timeOut = 600 * 1000, dataProvider = "settings")
-    public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, 
Class loadManager) throws Exception {
+    private enum TableViewType {
+        SystemTopic,
+        MetadataStore;
+    }
+
+    @Test(timeOut = 180 * 1000, dataProvider = "settings")
+    public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, 
Class loadManager,
+                                                   TableViewType 
tableViewType) throws Exception {
         // Setup.
         setupWithSettings(config -> {
             config.setManagedLedgerMaxEntriesPerLedger(1);
             config.setSystemTopicEnabled(enableSystemTopic);
             config.setTopicLevelPoliciesEnabled(enableSystemTopic);
-            if (loadManager != null) {
-                config.setLoadManagerClassName(loadManager.getName());
+            // Set load manager.
+            if (loadManager == null) {
+                return;
+            }
+            config.setLoadManagerClassName(loadManager.getName());
+            if (loadManager.getSimpleName().equals("ModularLoadManagerImpl")) {
+                return;
+            }
+            // Set params of metadata store table view.
+            if 
(loadManager.getSimpleName().equals("PreferExtensibleLoadManager")) {
+                if (tableViewType == TableViewType.MetadataStore) {
+                    
config.setLoadManagerServiceUnitStateTableViewClassName("org.apache.pulsar.broker.loadbalance"
+                            + 
".extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl");
+                    
config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.extensions"
+                            + ".scheduler.TransferShedder");
+                    config.setLoadBalancerSheddingEnabled(true);
+                    config.setLoadBalancerTransferEnabled(true);
+                }
             }
         });
 
@@ -104,6 +139,8 @@ public class ZkSessionExpireTest extends 
NetworkErrorTestBase {
         ProducerImpl<String> p1 = (ProducerImpl<String>) 
client1.newProducer(Schema.STRING).topic(topicName)
                 .sendTimeout(10, TimeUnit.SECONDS).create();
         Topic broker1Topic1 = pulsar1.getBrokerService().getTopic(topicName, 
false).join().get();
+        final String brokerAddr1 = admin1.lookups().lookupTopic(topicName);
+        log.info("the addr of broker 1: {}", brokerAddr1);
         assertNotNull(broker1Topic1);
         clearPreferBroker();
 
@@ -139,13 +176,24 @@ public class ZkSessionExpireTest extends 
NetworkErrorTestBase {
         // Publishing to broker-2 will success.
         CompletableFuture<MessageId> broker1Send2 = 
p1.sendAsync("broker1_msg2");
         CompletableFuture<MessageId> broker2Send2 = 
p2.sendAsync("broker2_msg2");
+        admin1.brokers().getActiveBrokers();
         try {
             broker1Send1.join();
             broker1Send2.join();
             p1.getClientCnx();
-            fail("expected a publish error");
+            // Since system topic load balancer does not rely on ZK, the 
publishing can succeed.
+            if (loadManager.getSimpleName().equals("ModularLoadManagerImpl")
+                    || TableViewType.MetadataStore.equals(tableViewType)) {
+                fail("Since the client can not get a correct response of 
lookup from the broker who can not connect to"
+                    + " metadata store, it should failed to publish");
+            }
         } catch (Exception ex) {
-            // Expected.
+            // Since topic transferring of system topic base load balancer 
does not rely on ZK, the both publish can
+            // succeed.
+            if (TableViewType.SystemTopic.equals(tableViewType)) {
+                fail("Since topic transferring of system topic base load 
balancer does not rely on ZK, the both publish"
+                    + " should succeed");
+            }
         }
         broker2Send1.join();
         broker2Send2.join();
@@ -153,29 +201,63 @@ public class ZkSessionExpireTest extends 
NetworkErrorTestBase {
         // Broker rebuild ZK session.
         metadataZKProxy.unRejectAllConnections();
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(getAvailableBrokers(pulsar1).size(), 2);
-            assertEquals(getAvailableBrokers(pulsar2).size(), 2);
+            Set<String> availableBrokers1 = getAvailableBrokers(pulsar1);
+            Set<String> availableBrokers2 = getAvailableBrokers(pulsar1);
+            log.info("Available brokers 1: {}", availableBrokers1);
+            log.info("Available brokers 2: {}", availableBrokers2);
+            assertEquals(availableBrokers1.size(), 2);
+            assertEquals(availableBrokers1.size(), 2);
         });
 
         // Verify: the topic on broker-1 will be unloaded.
         // Verify: the topic on broker-2 is fine.
-        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
-            CompletableFuture<Optional<Topic>> future = 
pulsar1.getBrokerService().getTopic(topicName, false);
+        Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> 
{
+            CompletableFuture<Optional<Topic>> future1 = 
pulsar1.getBrokerService().getTopic(topicName, false);
             log.info("broker 1 topics {}", 
pulsar1.getBrokerService().getTopics().keySet());
             log.info("broker 2 topics {}", 
pulsar2.getBrokerService().getTopics().keySet());
-            log.info("broker 1 bundles {}", 
pulsar1.getNamespaceService().getOwnershipCache().getOwnedBundles()
-                    .keySet().stream().map(k -> 
k.getNamespaceObject().toString() + "/" + k.getBundleRange())
+            log.info("broker 1 bundles {}", getOwnedBundles(pulsar1).stream()
                     .filter(s -> 
s.contains(defaultNamespace)).collect(Collectors.toList()));
-            log.info("broker 2 bundles {}", 
pulsar2.getNamespaceService().getOwnershipCache().getOwnedBundles()
-                    .keySet().stream().map(k -> 
k.getNamespaceObject().toString() + "/" + k.getBundleRange())
+            log.info("broker 2 bundles {}", getOwnedBundles(pulsar1).stream()
                     .filter(s -> 
s.contains(defaultNamespace)).collect(Collectors.toList()));
-            log.info("future: {}, isDone: {}, isCompletedExceptionally: {}",
-                    future, future == null ? "null" : future.isDone(),
-                    future, future == null ? "null" : 
future.isCompletedExceptionally());
-            assertTrue(future == null
+            String lookup1 = admin1.lookups().lookupTopic(topicName);
+            String lookup2 = admin2.lookups().lookupTopic(topicName);
+            log.info("lookup 1: {}", lookup1);
+            log.info("lookup 2: {}", lookup2);
+            // Both responses from different brokers should be the same.
+            assertEquals(lookup1, lookup2, "both lookup result should be the 
same");
+            // Except the system topic based load balancer, the topic should 
be loaded up on the broker-2.
+            log.info("future 1: {}, isDone: {}, isCompletedExceptionally: {}",
+                    future1, future1 == null ? "null" : future1.isDone(),
+                    future1 == null ? "null" : 
future1.isCompletedExceptionally());
+            boolean topicDoesNotExists1 = future1 == null
                     || 
!pulsar1.getBrokerService().getTopics().containsKey(topicName)
-                    || (future.isDone() && !future.isCompletedExceptionally() 
&& future.get().isEmpty())
-                    || future.isCompletedExceptionally());
+                    || (future1.isDone() && 
!future1.isCompletedExceptionally() && future1.get().isEmpty())
+                    || future1.isCompletedExceptionally();
+            if (loadManager.getSimpleName().equals("ModularLoadManagerImpl")
+                    || TableViewType.MetadataStore.equals(tableViewType)) {
+                assertNotEquals(lookup1, brokerAddr1, "lookup result should be 
broker-2");
+                assertNotEquals(lookup2, brokerAddr1, "lookup result should be 
broker-2");
+                assertTrue(topicDoesNotExists1, "topic should be unloaded from 
the broker-1");
+                List<String> bundle1 = getOwnedBundles(pulsar1).stream()
+                        .filter(s -> 
s.contains(defaultNamespace)).collect(Collectors.toList());
+                List<String> bundle2 = getOwnedBundles(pulsar2).stream()
+                        .filter(s -> 
s.contains(defaultNamespace)).collect(Collectors.toList());
+                log.info("broker 1 bundles the second time {}", bundle1);
+                log.info("broker 2 bundles the second time {}", bundle2);
+                assertEquals(bundle1.size(), 0);
+                assertEquals(bundle2.size(), 1);
+                if (TableViewType.MetadataStore.equals(tableViewType)) {
+                    
assertEquals(getMetadataStoreBasedInternalOwnedBundles(pulsar1, 
brokerAddr1).stream()
+                            .filter(s -> 
s.contains(defaultNamespace)).collect(Collectors.toList()).size(), 0);
+                    
assertEquals(getMetadataStoreBasedInternalOwnedBundles(pulsar2, 
lookup2).stream()
+                            .filter(s -> 
s.contains(defaultNamespace)).collect(Collectors.toList()).size(), 0);
+                }
+            }
+            // Regarding the system topic based load balancer, which does not 
rely on ZK, the topic may be owned by any
+            // broker.
+            if (TableViewType.SystemTopic.equals(tableViewType)) {
+                // This implementation will be finished by a seperated PR.
+            }
         });
         Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName, 
false).join().get();
         assertNotNull(broker2Topic3);
@@ -183,6 +265,9 @@ public class ZkSessionExpireTest extends 
NetworkErrorTestBase {
         // Send messages continuously.
         // Verify: p1.send will success(it will connect to broker-2).
         // Verify: p2.send will success.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(broker2Topic3.getProducers().size(), 2);
+        });
         CompletableFuture<MessageId> broker1Send3 = 
p1.sendAsync("broker1_msg3");
         CompletableFuture<MessageId> broker2Send3 = 
p2.sendAsync("broker2_msg3");
         broker1Send3.join();
@@ -196,4 +281,19 @@ public class ZkSessionExpireTest extends 
NetworkErrorTestBase {
         p2.close();
         admin2.topics().delete(topicName, false);
     }
+
+    private Collection<String> 
getMetadataStoreBasedInternalOwnedBundles(PulsarService pulsar, String 
brokerUrl) {
+        Object loadManagerWrapper = pulsar.getLoadManager().get();
+        ExtensibleLoadManagerImpl loadManager = 
WhiteboxImpl.getInternalState(loadManagerWrapper, "loadManager");
+        ServiceUnitStateChannel serviceUnitStateChannel = 
loadManager.getServiceUnitStateChannel();
+        ServiceUnitStateTableView tableview = 
WhiteboxImpl.getInternalState(serviceUnitStateChannel, "tableview");
+        MetadataStoreTableView<ServiceUnitStateData> tableviewInternal = 
WhiteboxImpl.getInternalState(tableview, "tableview");
+        ConcurrentMap<String, ServiceUnitStateData> data = 
WhiteboxImpl.getInternalState(tableviewInternal, "data");
+        return data.entrySet().stream().filter(e -> {
+            if (brokerUrl.equals(e.getValue().dstBroker()) && 
e.getValue().state().toString().equals("Owned")) {
+                return true;
+            }
+            return false;
+        }).map(e -> e.getKey()).collect(Collectors.toList());
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
index a2a5e4937a1..c06fbe3cc07 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.metadata.tableview.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -29,16 +30,19 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import lombok.Builder;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.CacheGetResult;
@@ -48,12 +52,14 @@ import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreTableView;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.jspecify.annotations.Nullable;
 
 @Slf4j
 public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T> {
 
     private static final int FILL_TIMEOUT_IN_MILLIS = 300_000;
-    private static final int MAX_CONCURRENT_METADATA_OPS_DURING_FILL = 50;
     private static final long CACHE_REFRESH_FREQUENCY_IN_MILLIS = 600_000;
     private final ConcurrentMap<String, T> data;
     private final Map<String, T> immutableData;
@@ -64,8 +70,26 @@ public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T>
     private final BiPredicate<T, T> conflictResolver;
     private final List<BiConsumer<String, T>> tailItemListeners;
     private final List<BiConsumer<String, T>> existingItemListeners;
+    private final List<BiConsumer<String, T>> outdatedItemListeners;
+    private final boolean clearUntrustedData;
     private final long timeoutInMillis;
     private final String pathPrefix;
+    private final Consumer<Throwable> tableViewShutDownListener;
+
+    @Deprecated
+    @Builder
+    public MetadataStoreTableViewImpl(@NonNull Class<T> clazz,
+                                      @NonNull String name,
+                                      @NonNull MetadataStore store,
+                                      @NonNull String pathPrefix,
+                                      @NonNull BiPredicate<T, T> 
conflictResolver,
+                                      Predicate<String> listenPathValidator,
+                                      List<BiConsumer<String, T>> 
tailItemListeners,
+                                      List<BiConsumer<String, T>> 
existingItemListeners,
+                                      long timeoutInMillis) {
+        this(clazz, name, store, pathPrefix, conflictResolver, 
listenPathValidator, tailItemListeners,
+                existingItemListeners, null, false, timeoutInMillis, null);
+    }
 
     /**
      * Construct MetadataStoreTableViewImpl.
@@ -79,7 +103,20 @@ public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T>
      * @param tailItemListeners     listener for tail item(recently updated) 
notifications
      * @param existingItemListeners listener for existing items in metadata 
store
      * @param timeoutInMillis       timeout duration for each sync operation.
-     * @throws MetadataStoreException if init fails.
+     * @param outdatedItemListeners Let's introduce how to ensure the correct 
element values: 1. The table
+     *        view will try its best to ensure that the data is always the 
latest. When it cannot be guaranteed, see
+     *        the next Article 2. If you get an old value, you will receive an 
update event later, ultimately ensuring
+     *        the accuracy of the data. 3. You will receive this notification 
when the first two cannot be guaranteed
+     *        due to the expiration of the metadata store session of the table 
view。After that, you also received
+     *        notifications {@param tailItemListeners} and {@param 
existingItemListeners}, indicating that the table
+     *        view can once again ensure that the first two can work properly.
+     * @param clearUntrustedData clear the items that have received the event 
{@param itemStateOutdatedListeners}. Which
+     *        means that rather that to get an untrusted value, to clear the 
untrusted values. You will not get a
+     *        {@param tailItemListeners} for this change, which changed the 
value cached to null.
+     * @param tableViewShutDownListener When the table view still cannot 
guarantee the real-time performance of the
+     *        data after making its best efforts, you will receive this event. 
It is recommended to restart the table
+     *        view at this time. This event will happen after {@param 
itemStateOutdatedListeners}, which mean it will be
+     *        disabled if {@param itemStateOutdatedListeners} is null.
      */
     @Builder
     public MetadataStoreTableViewImpl(@NonNull Class<T> clazz,
@@ -90,7 +127,10 @@ public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T>
                                       Predicate<String> listenPathValidator,
                                       List<BiConsumer<String, T>> 
tailItemListeners,
                                       List<BiConsumer<String, T>> 
existingItemListeners,
-                                      long timeoutInMillis) {
+                                      @Nullable List<BiConsumer<String, T>> 
outdatedItemListeners,
+                                      boolean clearUntrustedData,
+                                      long timeoutInMillis,
+                                      @Nullable Consumer<Throwable> 
tableViewShutDownListener) {
         this.name = name;
         this.data = new ConcurrentHashMap<>();
         this.immutableData = Collections.unmodifiableMap(data);
@@ -105,8 +145,14 @@ public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T>
         if (existingItemListeners != null) {
             this.existingItemListeners.addAll(existingItemListeners);
         }
+        this.outdatedItemListeners = new ArrayList<>();
+        if (outdatedItemListeners != null) {
+            this.outdatedItemListeners.addAll(outdatedItemListeners);
+        }
+        this.clearUntrustedData = clearUntrustedData;
         this.timeoutInMillis = timeoutInMillis;
         this.store = store;
+        this.tableViewShutDownListener = tableViewShutDownListener;
         this.cache = store.getMetadataCache(clazz,
                 MetadataCacheConfig.<T>builder()
                         .expireAfterWriteMillis(-1)
@@ -115,6 +161,12 @@ public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T>
                         .asyncReloadConsumer(this::consumeAsyncReload)
                         .build());
         store.registerListener(this::handleNotification);
+        if (store instanceof AbstractMetadataStore abstractMetadataStore) {
+            
abstractMetadataStore.registerSessionListener(this::handleSessionEvent);
+        } else {
+            // Since ServiceUnitStateMetadataStoreTableViewImpl has checked 
the configuration that named
+            // "zookeeperSessionExpiredPolicy", skip to print the duplicated 
log here.
+        }
     }
 
     public void start() throws MetadataStoreException {
@@ -159,6 +211,52 @@ public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T>
         return updated.booleanValue();
     }
 
+    public void handleSessionEvent(SessionEvent sessionEvent) {
+        if (CollectionUtils.isEmpty(outdatedItemListeners)) {
+            log.warn("{} Skipped handle metadata store session event {} 
because does not set itemOutdatedListeners",
+                name, sessionEvent);
+            return;
+        }
+        if (sessionEvent == SessionEvent.SessionLost) {
+            Map<String, T> snapshot = new HashMap<>(data);
+            log.warn("{} clearing owned bundles because metadata store session 
lost {}",  name, snapshot);
+            for (Map.Entry<String, T> entry : snapshot.entrySet()) {
+                for (var listener : outdatedItemListeners) {
+                    try {
+                        listener.accept(entry.getKey(), entry.getValue());
+                        if (clearUntrustedData) {
+                            data.remove(entry.getKey());
+                        }
+                    } catch (Throwable e) {
+                        if (tableViewShutDownListener == null) {
+                            log.warn("{} failed to listen item whose state is 
unknown because of metadata store"
+                                    + " session lost. key:{}, val:{}", name, 
entry.getKey(), entry.getValue(), e);
+                        } else {
+                            log.warn("{} Shutdown table view, because failed 
to listen item whose state is unknown due"
+                                    + " to metadata store session lost. 
key:{}, val:{}", name, entry.getKey(),
+                                    entry.getValue(), e);
+                            tableViewShutDownListener.accept(e);
+                        }
+                        break;
+                    }
+                }
+            }
+        } else if (sessionEvent == SessionEvent.SessionReestablished) {
+            log.info("{} Refilling bundle owner list after metadata store 
session reestablished",  name);
+            // Since just get a session expired issue, we'd better to print 
the initialize detail logs.
+            fillAsync(null, true).exceptionally(ex -> {
+                if (tableViewShutDownListener == null) {
+                    log.warn("{} failed to fill existing items after session 
reestablished", name, ex);
+                } else {
+                    log.error("{} Shutdown table view because failed to fill 
existing items after session"
+                        + " reestablished", name, ex);
+                    tableViewShutDownListener.accept(ex);
+                }
+                return null;
+            });
+        }
+    }
+
     private void handleTailItem(String key, T val) {
         if (updateData(key, val)) {
             if (log.isDebugEnabled()) {
@@ -234,51 +332,63 @@ public class MetadataStoreTableViewImpl<T> implements 
MetadataStoreTableView<T>
                 });
     }
 
+    /**
+     * Note: this method only be called when a broker is starting, please call 
{@link #fillAsync(AtomicLong, boolean)}
+     * for other use-case, otherwise, you may get a thread deadlock error.
+     */
     private void fill() throws MetadataStoreException {
-        final var deadline = System.currentTimeMillis() + 
FILL_TIMEOUT_IN_MILLIS;
-        log.info("{} start filling existing items under the pathPrefix:{}", 
name, pathPrefix);
-        ConcurrentLinkedDeque<String> q = new ConcurrentLinkedDeque<>();
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        q.add(pathPrefix);
-        LongAdder count = new LongAdder();
-        while (!q.isEmpty()) {
-            var now = System.currentTimeMillis();
-            if (now >= deadline) {
-                String err = name + " failed to fill existing items in "
-                        + 
TimeUnit.MILLISECONDS.toSeconds(FILL_TIMEOUT_IN_MILLIS) + " secs. Filled count:"
-                        + count.sum();
-                log.error(err);
-                throw new MetadataStoreException(err);
-            }
-            int size = Math.min(MAX_CONCURRENT_METADATA_OPS_DURING_FILL, 
q.size());
-            for (int i = 0; i < size; i++) {
-                String path = q.poll();
-                futures.add(store.getChildren(path)
-                        .thenCompose(children -> {
-                            // The path is leaf
-                            if (children.isEmpty()) {
-                                count.increment();
-                                return handleExisting(path);
-                            } else {
-                                for (var child : children) {
-                                    q.add(path + "/" + child);
-                                }
-                                return CompletableFuture.completedFuture(null);
-                            }
-                        }));
-            }
-            try {
-                FutureUtil.waitForAll(futures).get(
-                        Math.min(timeoutInMillis, deadline - now),
-                        TimeUnit.MILLISECONDS);
-            } catch (Throwable e) {
-                Throwable c = FutureUtil.unwrapCompletionException(e);
-                log.error("{} failed to fill existing items", name, c);
-                throw new MetadataStoreException(c);
-            }
-            futures.clear();
+        AtomicLong loadedCounter = new AtomicLong();
+        long maxWaitTime = Math.min(timeoutInMillis, FILL_TIMEOUT_IN_MILLIS);
+        try {
+            fillAsync(loadedCounter, false).get(maxWaitTime, 
TimeUnit.MILLISECONDS);
+            log.info("{} completed filling existing items with size:{}", name, 
loadedCounter.get());
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            String err = name + " failed to fill existing items in "
+                    + TimeUnit.MILLISECONDS.toSeconds(maxWaitTime) + " secs. 
Filled count:"
+                    + loadedCounter.get();
+            log.error(err);
+            throw new MetadataStoreException(err, 
FutureUtil.unwrapCompletionException(e));
         }
-        log.info("{} completed filling existing items with size:{}", name, 
count.sum());
+    }
+
+    private CompletableFuture<Void> handleExistingLeafs(String rootPath, 
String path, @Nullable AtomicLong count,
+                                                        boolean printDetails) {
+        return store.getChildren(path).thenCompose(children -> {
+            if (children.isEmpty()) {
+                // Skip root path.
+                if (rootPath.equals(path)) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                // Leaf node.
+                if (count != null) {
+                    count.incrementAndGet();
+                }
+                if (printDetails) {
+                    log.info("handling exist leaf {}", path);
+                }
+                return handleExisting(path);
+            } else {
+                // Dir node.
+                // The limitation of max quantity of concurrency reading is 
not needed, because the metadata store
+                // client works in a single thread, and all OPs will be 
packaged into a batch, the more the faster.
+                // If metadata store receives too many requests, it will split 
them into multi requests itself.
+                if (printDetails) {
+                    log.info("handling exist dir {}", path);
+                }
+                List<CompletableFuture<Void>> futureList = new ArrayList<>();
+                for (var child : children) {
+                    futureList.add(handleExistingLeafs(rootPath, path + "/" + 
child, count, printDetails));
+                }
+                return FutureUtil.waitForAll(futureList);
+            }
+        });
+    }
+
+    /**
+     * @param loadedCounter a metric value, to know how many value has been 
loaded, which is an internal value.
+     */
+    private CompletableFuture<Void> fillAsync(@Nullable AtomicLong 
loadedCounter, boolean printDetails) {
+        return handleExistingLeafs(pathPrefix, pathPrefix, loadedCounter, 
printDetails);
     }
 
 

Reply via email to