This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4e5bf4d1b9104234e9e6c8509eaded7bb19158ae
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Jan 14 20:41:01 2026 +0800

    [fix][broker] Avoid split non-existent bundle (#25031)
    
    (cherry picked from commit 38807b1511ba3b8c150d69c16a0c3ae36f321dac)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 21 ++++++++-
 .../pulsar/common/naming/NamespaceBundles.java     |  2 +-
 .../impl/ModularLoadManagerImplTest.java           | 54 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index f86b608d937..1f549cbb66e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -365,6 +366,16 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         return future;
     }
 
+    private boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles 
namespaceBundles,
+                                                           NamespaceBundle 
bundleRange) {
+        try {
+            namespaceBundles.validateBundle(bundleRange);
+            return true;
+        } catch (IllegalArgumentException e) {
+            return false;
+        }
+    }
+
     // Attempt to local the data for the given bundle in metadata store
     // If it cannot be found, return the default bundle data.
     @Override
@@ -751,8 +762,14 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                 try {
                     final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
                     final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
-                    if (!namespaceBundleFactory
-                            
.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
+                    NamespaceBundle bundle = 
namespaceBundleFactory.getBundle(namespaceName, bundleRange);
+                    if (!namespaceBundleFactory.canSplitBundle(bundle)) {
+                        continue;
+                    }
+
+                    NamespaceBundles bundles = 
namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName));
+                    if (!checkBundleDataExistInNamespaceBundles(bundles, 
bundle)) {
+                        log.warn("Bundle {} has been removed, skip split this 
bundle ", bundleName);
                         continue;
                     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index 27c73edc6b5..c298eb8aa36 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -106,7 +106,7 @@ public class NamespaceBundles {
         return bundles.size();
     }
 
-    public void validateBundle(NamespaceBundle nsBundle) throws Exception {
+    public void validateBundle(NamespaceBundle nsBundle) throws 
IllegalArgumentException {
         int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
         checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", 
nsBundle);
         NamespaceBundle foundBundle = bundles.get(idx);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index ad07dbfa217..577a8c19485 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -40,6 +40,7 @@ import com.google.common.hash.Hashing;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -70,6 +71,7 @@ import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -1130,4 +1132,56 @@ public class ModularLoadManagerImplTest {
         
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
     }
 
+    @Test
+    public void testRepeatSplitBundle() throws Exception {
+        final String cluster = "use";
+        final String tenant = "my-tenant";
+        final String namespace = "repeat-split-bundle";
+        final String topicName = tenant + "/" + namespace + "/" + "topic";
+        int bundleNumbers = 8;
+
+        admin1.clusters().createCluster(cluster, ClusterData.builder()
+            .serviceUrl(pulsar1.getWebServiceAddress()).build());
+        admin1.tenants().createTenant(tenant,
+            new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet(cluster)));
+        admin1.namespaces().createNamespace(tenant + "/" + namespace, 
bundleNumbers);
+
+        LoadData loadData = (LoadData) getField(primaryLoadManager, 
"loadData");
+        LocalBrokerData localData = (LocalBrokerData) 
getField(primaryLoadManager, "localData");
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
+
+        // create a lot of topic to fully distributed among bundles.
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String topicNameI = topicName + i;
+            admin1.topics().createPartitionedTopic(topicNameI, 20);
+            // trigger bundle assignment
+
+            Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicNameI)
+                .subscriptionName("my-subscriber-name2").subscribe();
+            consumers.add(consumer);
+        }
+
+        String topicToFindBundle = topicName + 0;
+        NamespaceBundle realBundle = 
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
+        String bundleKey = realBundle.toString();
+        log.info("Before bundle={}", bundleKey);
+
+        NamespaceBundleStats stats = new NamespaceBundleStats();
+        stats.msgRateIn = 100000.0;
+        localData.getLastStats().put(bundleKey, stats);
+        pulsar1.getBrokerService().updateRates();
+
+        primaryLoadManager.updateAll();
+
+        primaryLoadManager.updateAll();
+        Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey));
+
+        for (Consumer consumer : consumers) {
+            consumer.close();
+        }
+    }
+
 }

Reply via email to