heesung-sn commented on code in PR #20512:
URL: https://github.com/apache/pulsar/pull/20512#discussion_r1227091100


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();
+        for (String bundleName : allBundles) {
+            final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+            final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+            NamespaceBundle bundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) -> 
new HashSet<>()).add(bundle);
+        }
+
+        Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+        // check each namespace if bundle is valid in the cluster.
+        for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry : 
namespaceLocalBundleData.entrySet()) {
+            if (!isLeader()) {
+                return;

Review Comment:
   can we add warn log? I think this func is supposed to run by the leader only.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();

Review Comment:
   nit: maybe we can use the stream `groupingBy`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();
+        for (String bundleName : allBundles) {
+            final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+            final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+            NamespaceBundle bundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) -> 
new HashSet<>()).add(bundle);
+        }
+
+        Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+        // check each namespace if bundle is valid in the cluster.
+        for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry : 
namespaceLocalBundleData.entrySet()) {
+            if (!isLeader()) {
+                return;
+            }
+
+            // check if namespace exist.
+            NamespaceName namespace = 
NamespaceName.get(namespaceEntry.getKey());
+            Set<NamespaceBundle> loadManagerKnownBundles = 
namespaceEntry.getValue();
+
+            Optional<Policies> nsPolicies;
+
+            try {
+                nsPolicies = pulsar.getPulsarResources()
+                        .getNamespaceResources().getPolicies(namespace);
+            } catch (MetadataStoreException e) {
+                log.error("error when get policies for namespace {}", 
namespace, e);
+                continue;
+            }
+
+            if (nsPolicies.isEmpty()) {
+                loadManagerKnownBundles.forEach((bundle) -> {
+                    String bundleString = bundle.toString();
+                    loadManagerBundleData.remove(bundleString);
+                    if (isLeader()) {
+                        log.info("namespace [{}] is not exist in cluster, "
+                                + "remove load-balance bundle-data {} from 
metadata store.", namespace, bundleString);
+                        deleteBundleDataFromMetadataStore(bundleString);
+                    }
+                });
+
+                continue;
+            }
+
+            // namespace is being deleted, do nothing because delete namespace 
will handle the bundle-data deletion.
+            if (nsPolicies.get().deleted) {
+                if (log.isDebugEnabled()) {
+                    log.debug("ignore namespace {} because the namespace is 
mark deleted", namespace);
+                }
+                continue;
+            }
+
+            NamespaceBundles namespaceBundles =
+                    pulsar.getNamespaceService()
+                            .getNamespaceBundleFactory()
+                            .getBundles(namespace);
+
+            if (log.isDebugEnabled()) {
+                log.debug("bundles in namespace {} is {}", namespace, 
namespaceBundles.getBundles());
+            }
+
+            if (namespaceBundles == null) {
+                continue;
+            }
+
+            realClusterBundles.clear();
+            realClusterBundles.addAll(namespaceBundles.getBundles());
+
+            Sets.SetView<NamespaceBundle> alreadyNotExistBundles =
+                    Sets.difference(loadManagerKnownBundles, 
realClusterBundles);
+
+            alreadyNotExistBundles.forEach((bundle) -> {

Review Comment:
   Important to clarify:
   I think this will only remove parent bundles, but we already remove the 
parent bundle right after the split like the one below.
   Also,  this will not remove bundles that become inactive(no traffic), which 
is different from the existing behavior.
   
   ```
                       
this.pulsar.getNamespaceService().getNamespaceBundleFactory()
                               
.invalidateBundleCache(NamespaceName.get(namespaceName));
                       deleteBundleDataFromMetadataStore(bundleName);
   ```
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();
+        for (String bundleName : allBundles) {
+            final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+            final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+            NamespaceBundle bundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) -> 
new HashSet<>()).add(bundle);
+        }
+
+        Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+        // check each namespace if bundle is valid in the cluster.
+        for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry : 
namespaceLocalBundleData.entrySet()) {
+            if (!isLeader()) {
+                return;
+            }
+
+            // check if namespace exist.
+            NamespaceName namespace = 
NamespaceName.get(namespaceEntry.getKey());
+            Set<NamespaceBundle> loadManagerKnownBundles = 
namespaceEntry.getValue();
+
+            Optional<Policies> nsPolicies;
+
+            try {
+                nsPolicies = pulsar.getPulsarResources()
+                        .getNamespaceResources().getPolicies(namespace);
+            } catch (MetadataStoreException e) {
+                log.error("error when get policies for namespace {}", 
namespace, e);
+                continue;
+            }
+
+            if (nsPolicies.isEmpty()) {
+                loadManagerKnownBundles.forEach((bundle) -> {
+                    String bundleString = bundle.toString();
+                    loadManagerBundleData.remove(bundleString);
+                    if (isLeader()) {
+                        log.info("namespace [{}] is not exist in cluster, "
+                                + "remove load-balance bundle-data {} from 
metadata store.", namespace, bundleString);
+                        deleteBundleDataFromMetadataStore(bundleString);

Review Comment:
   According to the below comment, `// namespace is being deleted, do nothing 
because delete namespace will handle the bundle-data deletion.`, doesn't 
`delete namespace` handle it? Why do we need the cleanup logic here too?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());

Review Comment:
   Why do we need to create this `allBundles?` Can we just iterate 
`this.loadData.getBundleData().keySet()`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();
+        for (String bundleName : allBundles) {
+            final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+            final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+            NamespaceBundle bundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) -> 
new HashSet<>()).add(bundle);
+        }
+
+        Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+        // check each namespace if bundle is valid in the cluster.
+        for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry : 
namespaceLocalBundleData.entrySet()) {
+            if (!isLeader()) {
+                return;
+            }
+
+            // check if namespace exist.
+            NamespaceName namespace = 
NamespaceName.get(namespaceEntry.getKey());
+            Set<NamespaceBundle> loadManagerKnownBundles = 
namespaceEntry.getValue();
+
+            Optional<Policies> nsPolicies;
+
+            try {
+                nsPolicies = pulsar.getPulsarResources()
+                        .getNamespaceResources().getPolicies(namespace);
+            } catch (MetadataStoreException e) {
+                log.error("error when get policies for namespace {}", 
namespace, e);
+                continue;
+            }
+
+            if (nsPolicies.isEmpty()) {
+                loadManagerKnownBundles.forEach((bundle) -> {
+                    String bundleString = bundle.toString();
+                    loadManagerBundleData.remove(bundleString);
+                    if (isLeader()) {
+                        log.info("namespace [{}] is not exist in cluster, "
+                                + "remove load-balance bundle-data {} from 
metadata store.", namespace, bundleString);
+                        deleteBundleDataFromMetadataStore(bundleString);
+                    }
+                });
+
+                continue;
+            }
+
+            // namespace is being deleted, do nothing because delete namespace 
will handle the bundle-data deletion.
+            if (nsPolicies.get().deleted) {
+                if (log.isDebugEnabled()) {
+                    log.debug("ignore namespace {} because the namespace is 
mark deleted", namespace);
+                }
+                continue;
+            }
+
+            NamespaceBundles namespaceBundles =
+                    pulsar.getNamespaceService()
+                            .getNamespaceBundleFactory()
+                            .getBundles(namespace);
+
+            if (log.isDebugEnabled()) {
+                log.debug("bundles in namespace {} is {}", namespace, 
namespaceBundles.getBundles());
+            }
+
+            if (namespaceBundles == null) {
+                continue;

Review Comment:
   Seems like we need logging here, as this is an exceptional case?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle, 
String broker) {
         broker = broker.replaceFirst("http[s]?://", "");
         return this.bundleBrokerAffinityMap.put(bundle, broker);
     }
+
+    /**
+     * scanAndCleanupNonExistBundleData.
+     *
+     * When the bundle is split the loadManager's loadData.bundleData
+     * and the bundle-data in metadataStore need cleanup.
+     *
+     * Previous cleanup logic is in the `updateBundleData` method which rely 
on the loadData.bundleData.
+     *
+     * The reason to move the cleanup logic here is that the bundleData report 
by each broker
+     * may have a delay in `LoadReportUpdaterTask`,
+     * so the view in the LoadManager of the whole cluster data may not 
contain all the bundle data,
+     * which may cause active bundle-data (but not report to loadManager) to 
be deleted.
+     *
+     * To avoid active bundle-data to be deleted, this method use bundle-data 
in localPolicy
+     * as the truth active bundle source.
+     */
+    public void scanAndCleanupNonExistBundleData() {
+        Map<String, BundleData> loadManagerBundleData = 
this.loadData.getBundleData();
+
+        Set<String> allBundles = 
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+        allBundles.addAll(this.loadData.getBundleData().keySet());
+
+        // namespace -> bundles
+        Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new 
HashMap<>();
+        for (String bundleName : allBundles) {
+            final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+            final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+            NamespaceBundle bundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundle(namespaceName, bundleRange);
+            namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) -> 
new HashSet<>()).add(bundle);
+        }
+
+        Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+        // check each namespace if bundle is valid in the cluster.
+        for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry : 
namespaceLocalBundleData.entrySet()) {
+            if (!isLeader()) {
+                return;
+            }
+
+            // check if namespace exist.
+            NamespaceName namespace = 
NamespaceName.get(namespaceEntry.getKey());
+            Set<NamespaceBundle> loadManagerKnownBundles = 
namespaceEntry.getValue();
+
+            Optional<Policies> nsPolicies;
+
+            try {
+                nsPolicies = pulsar.getPulsarResources()
+                        .getNamespaceResources().getPolicies(namespace);
+            } catch (MetadataStoreException e) {
+                log.error("error when get policies for namespace {}", 
namespace, e);
+                continue;
+            }
+
+            if (nsPolicies.isEmpty()) {
+                loadManagerKnownBundles.forEach((bundle) -> {
+                    String bundleString = bundle.toString();
+                    loadManagerBundleData.remove(bundleString);
+                    if (isLeader()) {
+                        log.info("namespace [{}] is not exist in cluster, "

Review Comment:
   nit  : namespace [{}] policy does not exisit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to