This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 14dd76f2e3d25b0a7e515a5557593243acc9db35 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Thu May 11 22:17:06 2023 +0800 Revert "[improve] [broker] Skip split boundle if only one broker (#20190)" This reverts commit 7da787378089e0142a5235a15d0915925c177edd. --- .../loadbalance/impl/ModularLoadManagerImpl.java | 2 +- .../pulsar/broker/stats/PrometheusMetricsTest.java | 10 +--- .../pulsar/client/api/BrokerServiceLookupTest.java | 58 ---------------------- 3 files changed, 2 insertions(+), 68 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 293ff2760f0..d81f6949f43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -700,7 +700,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { public void checkNamespaceBundleSplit() { if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null - || !pulsar.getLeaderElectionService().isLeader() || loadData.getBrokerData().size() <= 1) { + || !pulsar.getLeaderElectionService().isLeader()) { return; } final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 1f21706f7dc..473adc26daa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -36,7 +36,6 @@ import java.nio.charset.StandardCharsets; import java.text.NumberFormat; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -75,7 +74,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.compaction.Compactor; -import org.apache.zookeeper.CreateMode; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -550,10 +548,6 @@ public class PrometheusMetricsTest extends BrokerTestBase { c1.acknowledge(c1.receive()); } - // Mock another broker to make split task work. - String mockedBroker = "/loadbalance/brokers/127.0.0.1:0"; - mockZooKeeper.create(mockedBroker, new byte[]{0}, Collections.emptyList(), CreateMode.EPHEMERAL); - pulsar.getBrokerService().updateRates(); Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0)); ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get(); @@ -577,9 +571,7 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertTrue(metrics.containsKey("pulsar_lb_bandwidth_in_usage")); assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage")); - assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total")); - - mockZooKeeper.delete(mockedBroker, 0); + assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count")); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 396fda82b59..e3bb6a92dc0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -66,7 +66,6 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import lombok.Cleanup; -import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -79,7 +78,6 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -764,62 +762,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { } } - @Test(timeOut = 20000) - public void testSkipSplitBundleIfOnlyOneBroker() throws Exception { - - log.info("-- Starting {} test --", methodName); - final String loadBalancerName = conf.getLoadManagerClassName(); - final int defaultNumberOfNamespaceBundles = conf.getDefaultNumberOfNamespaceBundles(); - final int loadBalancerNamespaceBundleMaxTopics = conf.getLoadBalancerNamespaceBundleMaxTopics(); - - final String namespace = "my-property/my-ns"; - final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); - final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); - try { - // configure broker with ModularLoadManager. - stopBroker(); - conf.setDefaultNumberOfNamespaceBundles(1); - conf.setLoadBalancerNamespaceBundleMaxTopics(1); - conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); - startBroker(); - final ModularLoadManagerWrapper modularLoadManagerWrapper = - (ModularLoadManagerWrapper) pulsar.getLoadManager().get(); - final ModularLoadManagerImpl modularLoadManager = - (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager(); - - // Create one topic and trigger tasks, then verify there is only one bundle now. - Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1) - .subscriptionName("my-subscriber-name").subscribe(); - List<NamespaceBundle> bounldes1 = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getBundles(NamespaceName.get(namespace)).getBundles(); - pulsar.getBrokerService().updateRates(); - pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); - pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper(); - modularLoadManager.updateAll(); - assertEquals(bounldes1.size(), 1); - - // Create the second topic and trigger tasks, then verify the split task will be skipped. - Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2) - .subscriptionName("my-subscriber-name").subscribe(); - pulsar.getBrokerService().updateRates(); - pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); - pulsar.getLoadManager().get().writeResourceQuotasToZooKeeper(); - modularLoadManager.updateAll(); - List<NamespaceBundle> bounldes2 = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getBundles(NamespaceName.get(namespace)).getBundles(); - assertEquals(bounldes2.size(), 1); - - consumer1.close(); - consumer2.close(); - admin.topics().delete(topicName1, false); - admin.topics().delete(topicName2, false); - } finally { - conf.setDefaultNumberOfNamespaceBundles(defaultNumberOfNamespaceBundles); - conf.setLoadBalancerNamespaceBundleMaxTopics(loadBalancerNamespaceBundleMaxTopics); - conf.setLoadManagerClassName(loadBalancerName); - } - } - @Test(timeOut = 10000) public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {