This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e285ff5cb91 [fix][broker] fix ModularLoadManagerImpl always delete
active bundle-data. sec ver. (#20620)
e285ff5cb91 is described below
commit e285ff5cb91e5327242174613ff2288042ee7120
Author: lifepuzzlefun <[email protected]>
AuthorDate: Mon Aug 21 10:39:28 2023 +0800
[fix][broker] fix ModularLoadManagerImpl always delete active bundle-data.
sec ver. (#20620)
Co-authored-by: wangjinlong <[email protected]>
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 21 ++--
.../impl/ModularLoadManagerImplTest.java | 119 +++++++++++++++++++++
2 files changed, 129 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index aa3b3c49646..7b8fc29e095 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -561,17 +561,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
bundleData.put(bundle, currentBundleData);
}
}
-
- //Remove not active bundle from loadData
- for (String bundle : bundleData.keySet()) {
- if (!activeBundles.contains(bundle)){
- bundleData.remove(bundle);
- if (pulsar.getLeaderElectionService().isLeader()){
- deleteBundleDataFromMetadataStore(bundle);
- }
- }
- }
-
// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData =
brokerData.getPreallocatedBundleData();
Set<String> ownedNsBundles =
pulsar.getNamespaceService().getOwnedServiceUnits()
@@ -606,6 +595,16 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(),
namespaceToBundleRange);
}
}
+
+ // Remove not active bundle from loadData
+ for (String bundle : bundleData.keySet()) {
+ if (!activeBundles.contains(bundle)){
+ bundleData.remove(bundle);
+ if (pulsar.getLeaderElectionService().isLeader()){
+ deleteBundleDataFromMetadataStore(bundle);
+ }
+ }
+ }
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 3fb62f486ab..786c9027c94 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -45,7 +45,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -63,17 +65,22 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -782,4 +789,116 @@ public class ModularLoadManagerImplTest {
Awaitility.await().untilAsserted(() ->
assertTrue(pulsar1.getLeaderElectionService().isLeader()));
assertEquals(data.size(), 1);
}
+
+
+ @Test
+ public void testRemoveNonExistBundleData()
+ throws PulsarAdminException, InterruptedException,
+ PulsarClientException, PulsarServerException,
NoSuchFieldException, IllegalAccessException {
+ final String cluster = "use";
+ final String tenant = "my-tenant";
+ final String namespace = "remove-non-exist-bundle-data-ns";
+ final String topicName = tenant + "/" + namespace + "/" + "topic";
+ int bundleNumbers = 8;
+
+ admin1.clusters().createCluster(cluster,
ClusterData.builder().serviceUrl("http://" +
pulsar1.getAdvertisedAddress()).build());
+ admin1.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet(cluster)));
+ admin1.namespaces().createNamespace(tenant + "/" + namespace,
bundleNumbers);
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
+
+ // create a lot of topic to fully distributed among bundles.
+ for (int i = 0; i < 10; i++) {
+ String topicNameI = topicName + i;
+ admin1.topics().createPartitionedTopic(topicNameI, 20);
+ // trigger bundle assignment
+
+ pulsarClient.newConsumer().topic(topicNameI)
+ .subscriptionName("my-subscriber-name2").subscribe();
+ }
+
+ ModularLoadManagerWrapper loadManagerWrapper =
(ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
+ ModularLoadManagerImpl lm1 = (ModularLoadManagerImpl)
loadManagerWrapper.getLoadManager();
+ ModularLoadManagerWrapper loadManager2 = (ModularLoadManagerWrapper)
pulsar2.getLoadManager().get();
+ ModularLoadManagerImpl lm2 = (ModularLoadManagerImpl)
loadManager2.getLoadManager();
+
+ Field executors = lm1.getClass().getDeclaredField("executors");
+ executors.setAccessible(true);
+ ExecutorService executorService = (ExecutorService) executors.get(lm1);
+
+ assertEquals(lm1.getAvailableBrokers().size(), 2);
+
+ pulsar1.getBrokerService().updateRates();
+ pulsar2.getBrokerService().updateRates();
+
+ lm1.writeBrokerDataOnZooKeeper(true);
+ lm2.writeBrokerDataOnZooKeeper(true);
+
+ // wait for metadata store notification finish
+ CountDownLatch latch = new CountDownLatch(1);
+ executorService.submit(latch::countDown);
+ latch.await();
+
+ loadManagerWrapper.writeResourceQuotasToZooKeeper();
+
+ MetadataCache<BundleData> bundlesCache =
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class);
+
+ // trigger bundle split
+ String topicToFindBundle = topicName + 0;
+ NamespaceBundle bundleWillBeSplit =
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
+
+ String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/"
+ tenant + "/" + namespace;
+ CompletableFuture<List<String>> children =
bundlesCache.getChildren(bundleDataPath);
+ List<String> bundles = children.join();
+ assertTrue(bundles.contains(bundleWillBeSplit.getBundleRange()));
+
+ // after updateAll no namespace bundle data is deleted from metadata
store.
+ lm1.updateAll();
+
+ children = bundlesCache.getChildren(bundleDataPath);
+ bundles = children.join();
+ assertFalse(bundles.isEmpty());
+ assertEquals(bundleNumbers, bundles.size());
+
+ NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+ pulsar1.getAdminClient().namespaces().splitNamespaceBundle(tenant +
"/" + namespace,
+ bundleWillBeSplit.getBundleRange(),
+ false,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
+
+ NamespaceBundles allBundlesAfterSplit =
+ pulsar1.getNamespaceService().getNamespaceBundleFactory()
+ .getBundles(namespaceName);
+
+
assertFalse(allBundlesAfterSplit.getBundles().contains(bundleWillBeSplit));
+
+ // the bundle data should be deleted
+
+ pulsar1.getBrokerService().updateRates();
+ pulsar2.getBrokerService().updateRates();
+
+ lm1.writeBrokerDataOnZooKeeper(true);
+ lm2.writeBrokerDataOnZooKeeper(true);
+
+ latch = new CountDownLatch(1);
+ // wait for metadata store notification finish
+ CountDownLatch finalLatch = latch;
+ executorService.submit(finalLatch::countDown);
+ latch.await();
+
+ loadManagerWrapper.writeResourceQuotasToZooKeeper();
+
+ lm1.updateAll();
+
+ log.info("update all triggered.");
+
+ // check bundle data should be deleted from metadata store.
+
+ CompletableFuture<List<String>> childrenAfterSplit =
bundlesCache.getChildren(bundleDataPath);
+ List<String> bundlesAfterSplit = childrenAfterSplit.join();
+
+
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
+ }
+
}