lifepuzzlefun commented on code in PR #20512:
URL: https://github.com/apache/pulsar/pull/20512#discussion_r1233026849
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -1200,4 +1196,111 @@ public String setNamespaceBundleAffinity(String bundle,
String broker) {
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
+
+ /**
+ * scanAndCleanupNonExistBundleData.
+ *
+ * When the bundle is split the loadManager's loadData.bundleData
+ * and the bundle-data in metadataStore need cleanup.
+ *
+ * Previous cleanup logic is in the `updateBundleData` method which rely
on the loadData.bundleData.
+ *
+ * The reason to move the cleanup logic here is that the bundleData report
by each broker
+ * may have a delay in `LoadReportUpdaterTask`,
+ * so the view in the LoadManager of the whole cluster data may not
contain all the bundle data,
+ * which may cause active bundle-data (but not report to loadManager) to
be deleted.
+ *
+ * To avoid active bundle-data to be deleted, this method use bundle-data
in localPolicy
+ * as the truth active bundle source.
+ */
+ public void scanAndCleanupNonExistBundleData() {
+ Map<String, BundleData> loadManagerBundleData =
this.loadData.getBundleData();
+
+ Set<String> allBundles =
ConcurrentHashMap.newKeySet(loadManagerBundleData.size());
+ allBundles.addAll(this.loadData.getBundleData().keySet());
+
+ // namespace -> bundles
+ Map<String, Set<NamespaceBundle>> namespaceLocalBundleData = new
HashMap<>();
+ for (String bundleName : allBundles) {
+ final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
+ NamespaceBundle bundle =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespaceName, bundleRange);
+ namespaceLocalBundleData.computeIfAbsent(namespaceName, (key) ->
new HashSet<>()).add(bundle);
+ }
+
+ Set<NamespaceBundle> realClusterBundles = new HashSet<>();
+
+ // check each namespace if bundle is valid in the cluster.
+ for (Map.Entry<String, Set<NamespaceBundle>> namespaceEntry :
namespaceLocalBundleData.entrySet()) {
+ if (!isLeader()) {
+ return;
+ }
+
+ // check if namespace exist.
+ NamespaceName namespace =
NamespaceName.get(namespaceEntry.getKey());
+ Set<NamespaceBundle> loadManagerKnownBundles =
namespaceEntry.getValue();
+
+ Optional<Policies> nsPolicies;
+
+ try {
+ nsPolicies = pulsar.getPulsarResources()
+ .getNamespaceResources().getPolicies(namespace);
+ } catch (MetadataStoreException e) {
+ log.error("error when get policies for namespace {}",
namespace, e);
+ continue;
+ }
+
+ if (nsPolicies.isEmpty()) {
+ loadManagerKnownBundles.forEach((bundle) -> {
+ String bundleString = bundle.toString();
+ loadManagerBundleData.remove(bundleString);
+ if (isLeader()) {
+ log.info("namespace [{}] is not exist in cluster, "
+ + "remove load-balance bundle-data {} from
metadata store.", namespace, bundleString);
+ deleteBundleDataFromMetadataStore(bundleString);
+ }
+ });
+
+ continue;
+ }
+
+ // namespace is being deleted, do nothing because delete namespace
will handle the bundle-data deletion.
+ if (nsPolicies.get().deleted) {
+ if (log.isDebugEnabled()) {
+ log.debug("ignore namespace {} because the namespace is
mark deleted", namespace);
+ }
+ continue;
+ }
+
+ NamespaceBundles namespaceBundles =
+ pulsar.getNamespaceService()
+ .getNamespaceBundleFactory()
+ .getBundles(namespace);
+
+ if (log.isDebugEnabled()) {
+ log.debug("bundles in namespace {} is {}", namespace,
namespaceBundles.getBundles());
+ }
+
+ if (namespaceBundles == null) {
+ continue;
+ }
+
+ realClusterBundles.clear();
+ realClusterBundles.addAll(namespaceBundles.getBundles());
+
+ Sets.SetView<NamespaceBundle> alreadyNotExistBundles =
+ Sets.difference(loadManagerKnownBundles,
realClusterBundles);
+
+ alreadyNotExistBundles.forEach((bundle) -> {
Review Comment:
@heesung-sn hi, I create a standalone instance and enable
ModularLoadMangerImpl the result show even all sessions are closed the current
master won't clear the bundle-data. the result can be see at
https://gist.github.com/lifepuzzlefun/d1181ed0eb97c82229298ce69b31daea
first create a test topic (non-partitioned) and start perf producer. we can
see from the bundle stats there are incoming traffic. after close all the
client to this topic, no traffic on this bundle 0x40000000_0x80000000. and
after about 2 hours later. the bundle data is still here. and also bundle
0xc0000000_0xffffffff is always no traffic the loadmanager is not delete them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]