lifepuzzlefun commented on code in PR #20512:
URL: https://github.com/apache/pulsar/pull/20512#discussion_r1227453146
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle,
String broker) {
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
+
+ /**
+ * scanAndCleanupNonExistBundleData.
+ *
+ * When the bundle is split the loadManager's loadData.bundleData
+ * and the bundle-data in metadataStore need cleanup.
+ *
+ * Previous cleanup logic is in the `updateBundleData` method which rely
on the loadData.bundleData.
+ *
+ * The reason to move the cleanup logic here is that the bundleData report
by each broker
+ * may have a delay in `LoadReportUpdaterTask`,
+ * so the view in the LoadManager of the whole cluster data may not
contain all the bundle data,
+ * which may cause active bundle-data (but not report to loadManager) to
be deleted.
+ *
+ * To avoid active bundle-data to be deleted, this method use bundle-data
in localPolicy
+ * as the truth active bundle source.
+ */
+ public void scanAndCleanupNonExistBundleData() {
+ Map<String, BundleData> loadManagerBundleData =
this.loadData.getBundleData();
+
+ Set<String> allBundles =
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+ allBundles.addAll(this.loadData.getBundleData().keySet());
+
+ // namespace -> bundles
+ Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new
HashMap<>();
+ for (String bundleName : allBundles) {
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+ NamespaceBundle bundle =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespaceName, bundleRange);
+ namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) ->
new HashSet<>()).add(bundle);
+ }
+
+ Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+ // check each namespace if bundle is valid in the cluster.
+ for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry :
namespaceLocalBundleData.entrySet()) {
+ if (!isLeader()) {
+ return;
+ }
+
+ // check if namespace exist.
+ NamespaceName namespace =
NamespaceName.get(namespaceEntry.getKey());
+ Set<NamespaceBundle> loadManagerKnownBundles =
namespaceEntry.getValue();
+
+ Optional<Policies> nsPolicies;
+
+ try {
+ nsPolicies = pulsar.getPulsarResources()
+ .getNamespaceResources().getPolicies(namespace);
+ } catch (MetadataStoreException e) {
+ log.error("error when get policies for namespace {}",
namespace, e);
+ continue;
+ }
+
+ if (nsPolicies.isEmpty()) {
+ loadManagerKnownBundles.forEach((bundle) -> {
+ String bundleString = bundle.toString();
+ loadManagerBundleData.remove(bundleString);
+ if (isLeader()) {
+ log.info("namespace [{}] is not exist in cluster, "
+ + "remove load-balance bundle-data {} from
metadata store.", namespace, bundleString);
+ deleteBundleDataFromMetadataStore(bundleString);
+ }
+ });
+
+ continue;
+ }
+
+ // namespace is being deleted, do nothing because delete namespace
will handle the bundle-data deletion.
+ if (nsPolicies.get().deleted) {
+ if (log.isDebugEnabled()) {
+ log.debug("ignore namespace {} because the namespace is
mark deleted", namespace);
+ }
+ continue;
+ }
+
+ NamespaceBundles namespaceBundles =
+ pulsar.getNamespaceService()
+ .getNamespaceBundleFactory()
+ .getBundles(namespace);
+
+ if (log.isDebugEnabled()) {
+ log.debug("bundles in namespace {} is {}", namespace,
namespaceBundles.getBundles());
+ }
+
+ if (namespaceBundles == null) {
+ continue;
+ }
+
+ realClusterBundles.clear();
+ realClusterBundles.addAll(namespaceBundles.getBundles());
+
+ Sets.SetView<NamespaceBundle> alreadyNotExistBundles =
+ Sets.difference(loadManagerKnownBundles,
realClusterBundles);
+
+ alreadyNotExistBundles.forEach((bundle) -> {
Review Comment:
Yes, there are two way to trigger the bundle split.
1. The LoadManager leader to trigger bundle split task.
2. The admin api, which is execute by the namespace bundle owner. the bundle
owner is not deleteBundleDataFrom metadataStore.
the case two is covered here to cleanup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]