This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 86206106f07dfd8e3b99c63b5f289e8b18d02593 Author: Jiwei Guo <[email protected]> AuthorDate: Wed Jan 3 22:01:28 2024 +0800 [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21842) (cherry picked from commit e10d318d60aab55532ab256a705a90780354cdc6) --- .../impl/ModularLoadManagerWrapper.java | 4 +-- .../impl/ModularLoadManagerImplTest.java | 31 +++++++++++++++++----- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index c61d39cf315..63bc7ab07fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -78,8 +78,8 @@ public class ModularLoadManagerWrapper implements LoadManager { private String getBrokerWebServiceUrl(String broker) { LocalBrokerData localData = (loadManager).getBrokerLocalData(broker); if (localData != null) { - return localData.getWebServiceUrl() != null ? localData.getWebServiceUrl() - : localData.getWebServiceUrlTls(); + return localData.getWebServiceUrlTls() != null ? localData.getWebServiceUrlTls() + : localData.getWebServiceUrl(); } return String.format("http://%s", broker); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index ceeb5704fb2..d4380aee42a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils; import org.apache.pulsar.broker.loadbalance.LoadData; import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.ResourceUnit; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -119,8 +120,12 @@ public class ModularLoadManagerImplTest { private PulsarService pulsar3; private String primaryHost; + + private String primaryTlsHost; private String secondaryHost; + private String secondaryTlsHost; + private NamespaceBundleFactory nsFactory; private ModularLoadManagerImpl primaryLoadManager; @@ -166,16 +171,19 @@ public class ModularLoadManagerImplTest { config1.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder"); config1.setClusterName("use"); config1.setWebServicePort(Optional.of(0)); + config1.setWebServicePortTls(Optional.of(0)); config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config1.setAdvertisedAddress("localhost"); config1.setBrokerShutdownTimeoutMs(0L); config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config1.setBrokerServicePort(Optional.of(0)); + config1.setBrokerServicePortTls(Optional.of(0)); pulsar1 = new PulsarService(config1); pulsar1.start(); primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get()); + primaryTlsHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTPS().get()); url1 = new URL(pulsar1.getWebServiceAddress()); admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); @@ -185,11 +193,13 @@ public class ModularLoadManagerImplTest { config2.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder"); config2.setClusterName("use"); config2.setWebServicePort(Optional.of(0)); + config2.setWebServicePortTls(Optional.of(0)); config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config2.setAdvertisedAddress("localhost"); config2.setBrokerShutdownTimeoutMs(0L); config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config2.setBrokerServicePort(Optional.of(0)); + config2.setBrokerServicePortTls(Optional.of(0)); pulsar2 = new PulsarService(config2); pulsar2.start(); @@ -198,14 +208,17 @@ public class ModularLoadManagerImplTest { config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder"); config.setClusterName("use"); config.setWebServicePort(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config.setAdvertisedAddress("localhost"); config.setBrokerShutdownTimeoutMs(0L); config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); + config.setBrokerServicePortTls(Optional.of(0)); pulsar3 = new PulsarService(config); secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get()); + secondaryTlsHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTPS().get()); url2 = new URL(pulsar2.getWebServiceAddress()); admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); @@ -430,9 +443,9 @@ public class ModularLoadManagerImplTest { pulsar1.getConfiguration().setLoadBalancerEnabled(true); final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData"); final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData(); - final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost)); + final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryTlsHost)); when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData); - brokerDataMap.put(primaryHost, brokerDataSpy1); + brokerDataMap.put(primaryTlsHost, brokerDataSpy1); // Need to update all the bundle data for the shredder to see the spy. primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080")); @@ -450,7 +463,7 @@ public class ModularLoadManagerImplTest { verify(namespacesSpy1, Mockito.times(1)) .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); assertEquals(bundleReference.get(), mockBundleName(2)); - assertEquals(selectedBrokerRef.get().get(), secondaryHost); + assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost); primaryLoadManagerSpy.doLoadShedding(); // Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be @@ -458,13 +471,13 @@ public class ModularLoadManagerImplTest { verify(namespacesSpy1, Mockito.times(2)) .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); assertEquals(bundleReference.get(), mockBundleName(1)); - assertEquals(selectedBrokerRef.get().get(), secondaryHost); + assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost); primaryLoadManagerSpy.doLoadShedding(); // Now both are in grace period: neither should be unloaded. verify(namespacesSpy1, Mockito.times(2)) .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); - assertEquals(selectedBrokerRef.get().get(), secondaryHost); + assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost); // Test bundle transfer to same broker @@ -477,7 +490,7 @@ public class ModularLoadManagerImplTest { loadData.getRecentlyUnloadedBundles().clear(); primaryLoadManagerSpy.doLoadShedding(); // The bundle shouldn't be unloaded because the broker is the same. - verify(namespacesSpy1, Mockito.times(3)) + verify(namespacesSpy1, Mockito.times(4)) .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); } @@ -704,7 +717,7 @@ public class ModularLoadManagerImplTest { admin1.namespaces().createNamespace(namespace); @Cleanup - PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build(); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "/my-topic1") .create(); ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1 @@ -895,6 +908,10 @@ public class ModularLoadManagerImplTest { String topicToFindBundle = topicName + 0; NamespaceBundle bundleWillBeSplit = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle)); + final Optional<ResourceUnit> leastLoaded = loadManagerWrapper.getLeastLoaded(bundleWillBeSplit); + assertFalse(leastLoaded.isEmpty()); + assertTrue(leastLoaded.get().getResourceId().startsWith("https")); + String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/" + tenant + "/" + namespace; CompletableFuture<List<String>> children = bundlesCache.getChildren(bundleDataPath); List<String> bundles = children.join();
