This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 89dafa5e031 [improve] [broker] Skip split boundle if only one broker 
(#20190)
89dafa5e031 is described below

commit 89dafa5e03129b4f35db31232fd816acd454cf11
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Sat Apr 29 01:54:05 2023 +0800

    [improve] [broker] Skip split boundle if only one broker (#20190)
    
    Co-authored-by: Zixuan Liu <node...@gmail.com>
    (cherry picked from commit d135c4a115038dc61f8fe2d230cb1f0c02239f92)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  2 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  9 ++++
 .../pulsar/client/api/BrokerServiceLookupTest.java | 58 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c6f406a7c8a..30a2ef5cdf2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -744,7 +744,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     public void checkNamespaceBundleSplit() {
 
         if (!conf.isLoadBalancerAutoBundleSplitEnabled() || 
pulsar.getLeaderElectionService() == null
-                || !pulsar.getLeaderElectionService().isLeader()) {
+                || !pulsar.getLeaderElectionService().isLeader() || 
knownBrokers.size() <= 1) {
             return;
         }
         final boolean unloadSplitBundles = 
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index f5ae8459f18..bf141e10aa1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -38,6 +38,7 @@ import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -80,6 +81,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -772,6 +774,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
             c1.acknowledge(c1.receive());
         }
 
+        // Mock another broker to make split task work.
+        String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
+        mockZooKeeper.create(mockedBroker, new byte[]{0}, 
Collections.emptyList(), CreateMode.EPHEMERAL);
+
         pulsar.getBrokerService().updateRates();
         Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
         ModularLoadManagerWrapper loadManager = 
(ModularLoadManagerWrapper)pulsar.getLoadManager().get();
@@ -796,6 +802,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
 
         assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
+
+        // cleanup.
+        mockZooKeeper.delete(mockedBroker, 0);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 2af9f450dd3..8597e0a8799 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -67,6 +67,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -80,6 +81,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -781,6 +783,62 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void testSkipSplitBundleIfOnlyOneBroker() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+        final String loadBalancerName = conf.getLoadManagerClassName();
+        final int defaultNumberOfNamespaceBundles = 
conf.getDefaultNumberOfNamespaceBundles();
+        final int loadBalancerNamespaceBundleMaxTopics = 
conf.getLoadBalancerNamespaceBundleMaxTopics();
+
+        final String namespace = "my-property/my-ns";
+        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
+        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
+        try {
+            // configure broker with ModularLoadManager.
+            stopBroker();
+            conf.setDefaultNumberOfNamespaceBundles(1);
+            conf.setLoadBalancerNamespaceBundleMaxTopics(1);
+            
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+            startBroker();
+            final ModularLoadManagerWrapper modularLoadManagerWrapper =
+                    (ModularLoadManagerWrapper) pulsar.getLoadManager().get();
+            final ModularLoadManagerImpl modularLoadManager =
+                    (ModularLoadManagerImpl) 
modularLoadManagerWrapper.getLoadManager();
+
+            // Create one topic and trigger tasks, then verify there is only 
one bundle now.
+            Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName1)
+                    .subscriptionName("my-subscriber-name").subscribe();
+            List<NamespaceBundle> bounldes1 = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundles(NamespaceName.get(namespace)).getBundles();
+            pulsar.getBrokerService().updateRates();
+            pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
+            pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
+            modularLoadManager.updateAll();
+            assertEquals(bounldes1.size(), 1);
+
+            // Create the second topic and trigger tasks, then verify the 
split task will be skipped.
+            Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName2)
+                    .subscriptionName("my-subscriber-name").subscribe();
+            pulsar.getBrokerService().updateRates();
+            pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
+            pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
+            modularLoadManager.updateAll();
+            List<NamespaceBundle> bounldes2 = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
+                    .getBundles(NamespaceName.get(namespace)).getBundles();
+            assertEquals(bounldes2.size(), 1);
+
+            consumer1.close();
+            consumer2.close();
+            admin.topics().delete(topicName1, false);
+            admin.topics().delete(topicName2, false);
+        } finally {
+            
conf.setDefaultNumberOfNamespaceBundles(defaultNumberOfNamespaceBundles);
+            
conf.setLoadBalancerNamespaceBundleMaxTopics(loadBalancerNamespaceBundleMaxTopics);
+            conf.setLoadManagerClassName(loadBalancerName);
+        }
+    }
+
     @Test(timeOut = 10000)
     public void testPartitionedMetadataWithDeprecatedVersion() throws 
Exception {
 

Reply via email to