This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e285ff5cb91 [fix][broker] fix ModularLoadManagerImpl always delete 
active bundle-data. sec ver. (#20620)
e285ff5cb91 is described below

commit e285ff5cb91e5327242174613ff2288042ee7120
Author: lifepuzzlefun <[email protected]>
AuthorDate: Mon Aug 21 10:39:28 2023 +0800

    [fix][broker] fix ModularLoadManagerImpl always delete active bundle-data. 
sec ver. (#20620)
    
    Co-authored-by: wangjinlong <[email protected]>
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  21 ++--
 .../impl/ModularLoadManagerImplTest.java           | 119 +++++++++++++++++++++
 2 files changed, 129 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index aa3b3c49646..7b8fc29e095 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -561,17 +561,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                     bundleData.put(bundle, currentBundleData);
                 }
             }
-
-            //Remove not active bundle from loadData
-            for (String bundle : bundleData.keySet()) {
-                if (!activeBundles.contains(bundle)){
-                    bundleData.remove(bundle);
-                    if (pulsar.getLeaderElectionService().isLeader()){
-                        deleteBundleDataFromMetadataStore(bundle);
-                    }
-                }
-            }
-
             // Remove all loaded bundles from the preallocated maps.
             final Map<String, BundleData> preallocatedBundleData = 
brokerData.getPreallocatedBundleData();
             Set<String> ownedNsBundles = 
pulsar.getNamespaceService().getOwnedServiceUnits()
@@ -606,6 +595,16 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                 
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), 
namespaceToBundleRange);
             }
         }
+
+        // Remove not active bundle from loadData
+        for (String bundle : bundleData.keySet()) {
+            if (!activeBundles.contains(bundle)){
+                bundleData.remove(bundle);
+                if (pulsar.getLeaderElectionService().isLeader()){
+                    deleteBundleDataFromMetadataStore(bundle);
+                }
+            }
+        }
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 3fb62f486ab..786c9027c94 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -45,7 +45,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -63,17 +65,22 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -782,4 +789,116 @@ public class ModularLoadManagerImplTest {
         Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar1.getLeaderElectionService().isLeader()));
         assertEquals(data.size(), 1);
     }
+
+
+    @Test
+    public void testRemoveNonExistBundleData()
+            throws PulsarAdminException, InterruptedException,
+            PulsarClientException, PulsarServerException, 
NoSuchFieldException, IllegalAccessException {
+        final String cluster = "use";
+        final String tenant = "my-tenant";
+        final String namespace = "remove-non-exist-bundle-data-ns";
+        final String topicName = tenant + "/" + namespace + "/" + "topic";
+        int bundleNumbers = 8;
+
+        admin1.clusters().createCluster(cluster, 
ClusterData.builder().serviceUrl("http://"; + 
pulsar1.getAdvertisedAddress()).build());
+        admin1.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet(cluster)));
+        admin1.namespaces().createNamespace(tenant + "/" + namespace, 
bundleNumbers);
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
+
+        // create a lot of topic to fully distributed among bundles.
+        for (int i = 0; i < 10; i++) {
+            String topicNameI = topicName + i;
+            admin1.topics().createPartitionedTopic(topicNameI, 20);
+            // trigger bundle assignment
+
+            pulsarClient.newConsumer().topic(topicNameI)
+                    .subscriptionName("my-subscriber-name2").subscribe();
+        }
+
+        ModularLoadManagerWrapper loadManagerWrapper = 
(ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
+        ModularLoadManagerImpl lm1 = (ModularLoadManagerImpl) 
loadManagerWrapper.getLoadManager();
+        ModularLoadManagerWrapper loadManager2 = (ModularLoadManagerWrapper) 
pulsar2.getLoadManager().get();
+        ModularLoadManagerImpl lm2 = (ModularLoadManagerImpl) 
loadManager2.getLoadManager();
+
+        Field executors = lm1.getClass().getDeclaredField("executors");
+        executors.setAccessible(true);
+        ExecutorService executorService = (ExecutorService) executors.get(lm1);
+
+        assertEquals(lm1.getAvailableBrokers().size(), 2);
+
+        pulsar1.getBrokerService().updateRates();
+        pulsar2.getBrokerService().updateRates();
+
+        lm1.writeBrokerDataOnZooKeeper(true);
+        lm2.writeBrokerDataOnZooKeeper(true);
+
+        // wait for metadata store notification finish
+        CountDownLatch latch = new CountDownLatch(1);
+        executorService.submit(latch::countDown);
+        latch.await();
+
+        loadManagerWrapper.writeResourceQuotasToZooKeeper();
+
+        MetadataCache<BundleData> bundlesCache = 
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class);
+
+        // trigger bundle split
+        String topicToFindBundle = topicName + 0;
+        NamespaceBundle bundleWillBeSplit = 
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
+
+        String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/" 
+ tenant + "/" + namespace;
+        CompletableFuture<List<String>> children = 
bundlesCache.getChildren(bundleDataPath);
+        List<String> bundles = children.join();
+        assertTrue(bundles.contains(bundleWillBeSplit.getBundleRange()));
+
+        // after updateAll no namespace bundle data is deleted from metadata 
store.
+        lm1.updateAll();
+
+        children = bundlesCache.getChildren(bundleDataPath);
+        bundles = children.join();
+        assertFalse(bundles.isEmpty());
+        assertEquals(bundleNumbers, bundles.size());
+
+        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+        pulsar1.getAdminClient().namespaces().splitNamespaceBundle(tenant + 
"/" + namespace,
+                bundleWillBeSplit.getBundleRange(),
+                false, 
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
+
+        NamespaceBundles allBundlesAfterSplit =
+                pulsar1.getNamespaceService().getNamespaceBundleFactory()
+                        .getBundles(namespaceName);
+
+        
assertFalse(allBundlesAfterSplit.getBundles().contains(bundleWillBeSplit));
+
+        // the bundle data should be deleted
+
+        pulsar1.getBrokerService().updateRates();
+        pulsar2.getBrokerService().updateRates();
+
+        lm1.writeBrokerDataOnZooKeeper(true);
+        lm2.writeBrokerDataOnZooKeeper(true);
+
+        latch = new CountDownLatch(1);
+        // wait for metadata store notification finish
+        CountDownLatch finalLatch = latch;
+        executorService.submit(finalLatch::countDown);
+        latch.await();
+
+        loadManagerWrapper.writeResourceQuotasToZooKeeper();
+
+        lm1.updateAll();
+
+        log.info("update all triggered.");
+
+        // check bundle data should be deleted from metadata store.
+
+        CompletableFuture<List<String>> childrenAfterSplit = 
bundlesCache.getChildren(bundleDataPath);
+        List<String> bundlesAfterSplit = childrenAfterSplit.join();
+
+        
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
+    }
+
 }

Reply via email to