This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 89dafa5e031 [improve] [broker] Skip split boundle if only one broker (#20190) 89dafa5e031 is described below commit 89dafa5e03129b4f35db31232fd816acd454cf11 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Sat Apr 29 01:54:05 2023 +0800 [improve] [broker] Skip split boundle if only one broker (#20190) Co-authored-by: Zixuan Liu <node...@gmail.com> (cherry picked from commit d135c4a115038dc61f8fe2d230cb1f0c02239f92) --- .../loadbalance/impl/ModularLoadManagerImpl.java | 2 +- .../pulsar/broker/stats/PrometheusMetricsTest.java | 9 ++++ .../pulsar/client/api/BrokerServiceLookupTest.java | 58 ++++++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index c6f406a7c8a..30a2ef5cdf2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -744,7 +744,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { public void checkNamespaceBundleSplit() { if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null - || !pulsar.getLeaderElectionService().isLeader()) { + || !pulsar.getLeaderElectionService().isLeader() || knownBrokers.size() <= 1) { return; } final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index f5ae8459f18..bf141e10aa1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -38,6 +38,7 @@ import java.text.NumberFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -80,6 +81,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.compaction.Compactor; +import org.apache.zookeeper.CreateMode; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; @@ -772,6 +774,10 @@ public class PrometheusMetricsTest extends BrokerTestBase { c1.acknowledge(c1.receive()); } + // Mock another broker to make split task work. + String mockedBroker = "/loadbalance/brokers/127.0.0.1:0"; + mockZooKeeper.create(mockedBroker, new byte[]{0}, Collections.emptyList(), CreateMode.EPHEMERAL); + pulsar.getBrokerService().updateRates(); Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0)); ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get(); @@ -796,6 +802,9 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage")); assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total")); + + // cleanup. + mockZooKeeper.delete(mockedBroker, 0); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 2af9f450dd3..8597e0a8799 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -67,6 +67,7 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -80,6 +81,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -781,6 +783,62 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { } } + @Test(timeOut = 20000) + public void testSkipSplitBundleIfOnlyOneBroker() throws Exception { + + log.info("-- Starting {} test --", methodName); + final String loadBalancerName = conf.getLoadManagerClassName(); + final int defaultNumberOfNamespaceBundles = conf.getDefaultNumberOfNamespaceBundles(); + final int loadBalancerNamespaceBundleMaxTopics = conf.getLoadBalancerNamespaceBundleMaxTopics(); + + final String namespace = "my-property/my-ns"; + final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); + final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); + try { + // configure broker with ModularLoadManager. + stopBroker(); + conf.setDefaultNumberOfNamespaceBundles(1); + conf.setLoadBalancerNamespaceBundleMaxTopics(1); + conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + startBroker(); + final ModularLoadManagerWrapper modularLoadManagerWrapper = + (ModularLoadManagerWrapper) pulsar.getLoadManager().get(); + final ModularLoadManagerImpl modularLoadManager = + (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager(); + + // Create one topic and trigger tasks, then verify there is only one bundle now. + Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1) + .subscriptionName("my-subscriber-name").subscribe(); + List<NamespaceBundle> bounldes1 = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getBundles(NamespaceName.get(namespace)).getBundles(); + pulsar.getBrokerService().updateRates(); + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper(); + modularLoadManager.updateAll(); + assertEquals(bounldes1.size(), 1); + + // Create the second topic and trigger tasks, then verify the split task will be skipped. + Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2) + .subscriptionName("my-subscriber-name").subscribe(); + pulsar.getBrokerService().updateRates(); + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper(); + modularLoadManager.updateAll(); + List<NamespaceBundle> bounldes2 = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getBundles(NamespaceName.get(namespace)).getBundles(); + assertEquals(bounldes2.size(), 1); + + consumer1.close(); + consumer2.close(); + admin.topics().delete(topicName1, false); + admin.topics().delete(topicName2, false); + } finally { + conf.setDefaultNumberOfNamespaceBundles(defaultNumberOfNamespaceBundles); + conf.setLoadBalancerNamespaceBundleMaxTopics(loadBalancerNamespaceBundleMaxTopics); + conf.setLoadManagerClassName(loadBalancerName); + } + } + @Test(timeOut = 10000) public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {