This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d135c4a1150 [improve] [broker] Skip split boundle if only one broker
(#20190)
d135c4a1150 is described below
commit d135c4a115038dc61f8fe2d230cb1f0c02239f92
Author: fengyubiao <[email protected]>
AuthorDate: Sat Apr 29 01:54:05 2023 +0800
[improve] [broker] Skip split boundle if only one broker (#20190)
Co-authored-by: Zixuan Liu <[email protected]>
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 2 +-
.../pulsar/broker/stats/PrometheusMetricsTest.java | 9 ++++
.../pulsar/client/api/BrokerServiceLookupTest.java | 58 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c6f406a7c8a..30a2ef5cdf2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -744,7 +744,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
public void checkNamespaceBundleSplit() {
if (!conf.isLoadBalancerAutoBundleSplitEnabled() ||
pulsar.getLeaderElectionService() == null
- || !pulsar.getLeaderElectionService().isLeader()) {
+ || !pulsar.getLeaderElectionService().isLeader() ||
knownBrokers.size() <= 1) {
return;
}
final boolean unloadSplitBundles =
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index f5ae8459f18..bf141e10aa1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -38,6 +38,7 @@ import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -80,6 +81,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.CreateMode;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -772,6 +774,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
c1.acknowledge(c1.receive());
}
+ // Mock another broker to make split task work.
+ String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
+ mockZooKeeper.create(mockedBroker, new byte[]{0},
Collections.emptyList(), CreateMode.EPHEMERAL);
+
pulsar.getBrokerService().updateRates();
Awaitility.await().untilAsserted(() ->
assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
ModularLoadManagerWrapper loadManager =
(ModularLoadManagerWrapper)pulsar.getLoadManager().get();
@@ -796,6 +802,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
+
+ // cleanup.
+ mockZooKeeper.delete(mockedBroker, 0);
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 2af9f450dd3..8597e0a8799 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -67,6 +67,7 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -80,6 +81,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -781,6 +783,62 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
}
}
+ @Test(timeOut = 20000)
+ public void testSkipSplitBundleIfOnlyOneBroker() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+ final String loadBalancerName = conf.getLoadManagerClassName();
+ final int defaultNumberOfNamespaceBundles =
conf.getDefaultNumberOfNamespaceBundles();
+ final int loadBalancerNamespaceBundleMaxTopics =
conf.getLoadBalancerNamespaceBundleMaxTopics();
+
+ final String namespace = "my-property/my-ns";
+ final String topicName1 = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/tp_");
+ final String topicName2 = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/tp_");
+ try {
+ // configure broker with ModularLoadManager.
+ stopBroker();
+ conf.setDefaultNumberOfNamespaceBundles(1);
+ conf.setLoadBalancerNamespaceBundleMaxTopics(1);
+
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ startBroker();
+ final ModularLoadManagerWrapper modularLoadManagerWrapper =
+ (ModularLoadManagerWrapper) pulsar.getLoadManager().get();
+ final ModularLoadManagerImpl modularLoadManager =
+ (ModularLoadManagerImpl)
modularLoadManagerWrapper.getLoadManager();
+
+ // Create one topic and trigger tasks, then verify there is only
one bundle now.
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topicName1)
+ .subscriptionName("my-subscriber-name").subscribe();
+ List<NamespaceBundle> bounldes1 =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getBundles(NamespaceName.get(namespace)).getBundles();
+ pulsar.getBrokerService().updateRates();
+ pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
+ pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
+ modularLoadManager.updateAll();
+ assertEquals(bounldes1.size(), 1);
+
+ // Create the second topic and trigger tasks, then verify the
split task will be skipped.
+ Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topic(topicName2)
+ .subscriptionName("my-subscriber-name").subscribe();
+ pulsar.getBrokerService().updateRates();
+ pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
+ pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper();
+ modularLoadManager.updateAll();
+ List<NamespaceBundle> bounldes2 =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getBundles(NamespaceName.get(namespace)).getBundles();
+ assertEquals(bounldes2.size(), 1);
+
+ consumer1.close();
+ consumer2.close();
+ admin.topics().delete(topicName1, false);
+ admin.topics().delete(topicName2, false);
+ } finally {
+
conf.setDefaultNumberOfNamespaceBundles(defaultNumberOfNamespaceBundles);
+
conf.setLoadBalancerNamespaceBundleMaxTopics(loadBalancerNamespaceBundleMaxTopics);
+ conf.setLoadManagerClassName(loadBalancerName);
+ }
+ }
+
@Test(timeOut = 10000)
public void testPartitionedMetadataWithDeprecatedVersion() throws
Exception {