lifepuzzlefun commented on code in PR #20512:
URL: https://github.com/apache/pulsar/pull/20512#discussion_r1233027650


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();
+        for (String bundleName : allBundles) {
+            final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+            final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+            NamespaceBundle bundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) -> 
new HashSet<>()).add(bundle);
+        }
+
+        Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+        // check each namespace if bundle is valid in the cluster.
+        for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry : 
namespaceLocalBundleData.entrySet()) {
+            if (!isLeader()) {
+                return;
+            }
+
+            // check if namespace exist.
+            NamespaceName namespace = 
NamespaceName.get(namespaceEntry.getKey());
+            Set<NamespaceBundle> loadManagerKnownBundles = 
namespaceEntry.getValue();
+
+            Optional<Policies> nsPolicies;
+
+            try {
+                nsPolicies = pulsar.getPulsarResources()
+                        .getNamespaceResources().getPolicies(namespace);
+            } catch (MetadataStoreException e) {
+                log.error("error when get policies for namespace {}", 
namespace, e);
+                continue;
+            }
+
+            if (nsPolicies.isEmpty()) {
+                loadManagerKnownBundles.forEach((bundle) -> {
+                    String bundleString = bundle.toString();
+                    loadManagerBundleData.remove(bundleString);
+                    if (isLeader()) {
+                        log.info("namespace [{}] is not exist in cluster, "
+                                + "remove load-balance bundle-data {} from 
metadata store.", namespace, bundleString);
+                        deleteBundleDataFromMetadataStore(bundleString);
+                    }
+                });
+
+                continue;
+            }
+
+            // namespace is being deleted, do nothing because delete namespace 
will handle the bundle-data deletion.
+            if (nsPolicies.get().deleted) {
+                if (log.isDebugEnabled()) {
+                    log.debug("ignore namespace {} because the namespace is 
mark deleted", namespace);
+                }
+                continue;
+            }
+
+            NamespaceBundles namespaceBundles =
+                    pulsar.getNamespaceService()
+                            .getNamespaceBundleFactory()
+                            .getBundles(namespace);
+
+            if (log.isDebugEnabled()) {
+                log.debug("bundles in namespace {} is {}", namespace, 
namespaceBundles.getBundles());
+            }
+
+            if (namespaceBundles == null) {
+                continue;
+            }
+
+            realClusterBundles.clear();
+            realClusterBundles.addAll(namespaceBundles.getBundles());
+
+            Sets.SetView<NamespaceBundle> alreadyNotExistBundles =
+                    Sets.difference(loadManagerKnownBundles, 
realClusterBundles);
+
+            alreadyNotExistBundles.forEach((bundle) -> {

Review Comment:
   i my opinion. the **active** in https://github.com/apache/pulsar/pull/13974 
only means 
   
   > clean inactive bundle from bundleData in loadData and bundlesCache after 
the bundle be split or unload
   
   not no traffic. if we have logic to clear up no traffic bundle-data  the 
code has logic to verify if the bunde has traffic.
   
   or if the logic is just rely on the load report to think if the bundle-data 
is not reported. the bundle is no traffic at all. there should be logic to 
generate bundleMetric to check if the bundle has no logic the load-report 
should not contain this bundle. but both logic did not occur on the master.
   
   And the if we think the current logic as a clean up for the no-traffic 
bundle-data. I just have to say this is a bug, not a feature. because in my 
production environment has 10000 bundles with traffic, only less than 100 
bundle-data on the metadata-store all the time, because of the clean up logic.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to