heesung-sn commented on code in PR #20512:
URL: https://github.com/apache/pulsar/pull/20512#discussion_r1233420854


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();
+        for (String bundleName : allBundles) {
+            final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+            final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+            NamespaceBundle bundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) -> 
new HashSet<>()).add(bundle);
+        }
+
+        Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+        // check each namespace if bundle is valid in the cluster.
+        for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry : 
namespaceLocalBundleData.entrySet()) {
+            if (!isLeader()) {
+                return;
+            }
+
+            // check if namespace exist.
+            NamespaceName namespace = 
NamespaceName.get(namespaceEntry.getKey());
+            Set<NamespaceBundle> loadManagerKnownBundles = 
namespaceEntry.getValue();
+
+            Optional<Policies> nsPolicies;
+
+            try {
+                nsPolicies = pulsar.getPulsarResources()
+                        .getNamespaceResources().getPolicies(namespace);
+            } catch (MetadataStoreException e) {
+                log.error("error when get policies for namespace {}", 
namespace, e);
+                continue;
+            }
+
+            if (nsPolicies.isEmpty()) {
+                loadManagerKnownBundles.forEach((bundle) -> {
+                    String bundleString = bundle.toString();
+                    loadManagerBundleData.remove(bundleString);
+                    if (isLeader()) {
+                        log.info("namespace [{}] is not exist in cluster, "
+                                + "remove load-balance bundle-data {} from 
metadata store.", namespace, bundleString);
+                        deleteBundleDataFromMetadataStore(bundleString);
+                    }
+                });
+
+                continue;
+            }
+
+            // namespace is being deleted, do nothing because delete namespace 
will handle the bundle-data deletion.
+            if (nsPolicies.get().deleted) {
+                if (log.isDebugEnabled()) {
+                    log.debug("ignore namespace {} because the namespace is 
mark deleted", namespace);
+                }
+                continue;
+            }
+
+            NamespaceBundles namespaceBundles =
+                    pulsar.getNamespaceService()
+                            .getNamespaceBundleFactory()
+                            .getBundles(namespace);
+
+            if (log.isDebugEnabled()) {
+                log.debug("bundles in namespace {} is {}", namespace, 
namespaceBundles.getBundles());
+            }
+
+            if (namespaceBundles == null) {
+                continue;
+            }
+
+            realClusterBundles.clear();
+            realClusterBundles.addAll(namespaceBundles.getBundles());
+
+            Sets.SetView<NamespaceBundle> alreadyNotExistBundles =
+                    Sets.difference(loadManagerKnownBundles, 
realClusterBundles);
+
+            alreadyNotExistBundles.forEach((bundle) -> {

Review Comment:
   I guess inactive bundles are cleaned only after unload(or split), when the 
unload or split forcefully cleans the bundle in the statsMap and other 
cache/stores.
   
   To confirm, can you try to unload the bundle after stopping the traffic in 
your test and see if they are indeed cleaned only after unload?
   
   If the above is true, when the next lookup happened on that unloaded bundle, 
it should've created the bundle back in the bundle cache and metadata store. I 
dont understand why you didt see this in your environment. I wonder if we need 
to dig more why the bundles didn't get restored in the cache when they were 
reactivated.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to