This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 102254e0672ff86b23a73585c597a0433c57a54a Author: Cong Zhao <[email protected]> AuthorDate: Wed Jan 14 20:41:01 2026 +0800 [fix][broker] Avoid split non-existent bundle (#25031) (cherry picked from commit 38807b1511ba3b8c150d69c16a0c3ae36f321dac) --- .../loadbalance/impl/ModularLoadManagerImpl.java | 21 ++++++++- .../pulsar/common/naming/NamespaceBundles.java | 2 +- .../impl/ModularLoadManagerImplTest.java | 54 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 75c60e26879..a9d7ddd78e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -67,6 +67,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -368,6 +369,16 @@ public class ModularLoadManagerImpl implements ModularLoadManager { return future; } + private boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles, + NamespaceBundle bundleRange) { + try { + namespaceBundles.validateBundle(bundleRange); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + // Attempt to local the data for the given bundle in metadata store // If it cannot be found, return the default bundle data. @Override @@ -762,8 +773,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager { try { final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName); final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName); - if (!namespaceBundleFactory - .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { + NamespaceBundle bundle = namespaceBundleFactory.getBundle(namespaceName, bundleRange); + if (!namespaceBundleFactory.canSplitBundle(bundle)) { + continue; + } + + NamespaceBundles bundles = namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName)); + if (!checkBundleDataExistInNamespaceBundles(bundles, bundle)) { + log.warn("Bundle {} has been removed, skip split this bundle ", bundleName); continue; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index 27c73edc6b5..c298eb8aa36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -106,7 +106,7 @@ public class NamespaceBundles { return bundles.size(); } - public void validateBundle(NamespaceBundle nsBundle) throws Exception { + public void validateBundle(NamespaceBundle nsBundle) throws IllegalArgumentException { int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint()); checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle); NamespaceBundle foundBundle = bundles.get(idx); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index ad07dbfa217..577a8c19485 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -40,6 +40,7 @@ import com.google.common.hash.Hashing; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.URL; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -70,6 +71,7 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -1130,4 +1132,56 @@ public class ModularLoadManagerImplTest { assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange())); } + @Test + public void testRepeatSplitBundle() throws Exception { + final String cluster = "use"; + final String tenant = "my-tenant"; + final String namespace = "repeat-split-bundle"; + final String topicName = tenant + "/" + namespace + "/" + "topic"; + int bundleNumbers = 8; + + admin1.clusters().createCluster(cluster, ClusterData.builder() + .serviceUrl(pulsar1.getWebServiceAddress()).build()); + admin1.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster))); + admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers); + + LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData"); + LocalBrokerData localData = (LocalBrokerData) getField(primaryLoadManager, "localData"); + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build(); + + // create a lot of topic to fully distributed among bundles. + List<Consumer> consumers = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String topicNameI = topicName + i; + admin1.topics().createPartitionedTopic(topicNameI, 20); + // trigger bundle assignment + + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicNameI) + .subscriptionName("my-subscriber-name2").subscribe(); + consumers.add(consumer); + } + + String topicToFindBundle = topicName + 0; + NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle)); + String bundleKey = realBundle.toString(); + log.info("Before bundle={}", bundleKey); + + NamespaceBundleStats stats = new NamespaceBundleStats(); + stats.msgRateIn = 100000.0; + localData.getLastStats().put(bundleKey, stats); + pulsar1.getBrokerService().updateRates(); + + primaryLoadManager.updateAll(); + + primaryLoadManager.updateAll(); + Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey)); + + for (Consumer consumer : consumers) { + consumer.close(); + } + } + }
