This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 86206106f07dfd8e3b99c63b5f289e8b18d02593
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Jan 3 22:01:28 2024 +0800

    [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and 
webServicePortTls are set (#21842)
    
    (cherry picked from commit e10d318d60aab55532ab256a705a90780354cdc6)
---
 .../impl/ModularLoadManagerWrapper.java            |  4 +--
 .../impl/ModularLoadManagerImplTest.java           | 31 +++++++++++++++++-----
 2 files changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index c61d39cf315..63bc7ab07fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -78,8 +78,8 @@ public class ModularLoadManagerWrapper implements LoadManager 
{
     private String getBrokerWebServiceUrl(String broker) {
         LocalBrokerData localData = (loadManager).getBrokerLocalData(broker);
         if (localData != null) {
-            return localData.getWebServiceUrl() != null ? 
localData.getWebServiceUrl()
-                    : localData.getWebServiceUrlTls();
+            return localData.getWebServiceUrlTls() != null ? 
localData.getWebServiceUrlTls()
+                    : localData.getWebServiceUrl();
         }
         return String.format("http://%s";, broker);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index ceeb5704fb2..d4380aee42a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -119,8 +120,12 @@ public class ModularLoadManagerImplTest {
     private PulsarService pulsar3;
 
     private String primaryHost;
+
+    private String primaryTlsHost;
     private String secondaryHost;
 
+    private String secondaryTlsHost;
+
     private NamespaceBundleFactory nsFactory;
 
     private ModularLoadManagerImpl primaryLoadManager;
@@ -166,16 +171,19 @@ public class ModularLoadManagerImplTest {
         
config1.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
+        config1.setWebServicePortTls(Optional.of(0));
         config1.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
 
         config1.setAdvertisedAddress("localhost");
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config1.setBrokerServicePort(Optional.of(0));
+        config1.setBrokerServicePortTls(Optional.of(0));
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
 
         primaryHost = String.format("%s:%d", "localhost", 
pulsar1.getListenPortHTTP().get());
+        primaryTlsHost = String.format("%s:%d", "localhost", 
pulsar1.getListenPortHTTPS().get());
         url1 = new URL(pulsar1.getWebServiceAddress());
         admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
 
@@ -185,11 +193,13 @@ public class ModularLoadManagerImplTest {
         
config2.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
+        config2.setWebServicePortTls(Optional.of(0));
         config2.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config2.setAdvertisedAddress("localhost");
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config2.setBrokerServicePort(Optional.of(0));
+        config2.setBrokerServicePortTls(Optional.of(0));
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
 
@@ -198,14 +208,17 @@ public class ModularLoadManagerImplTest {
         
config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
         config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setAdvertisedAddress("localhost");
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
         pulsar3 = new PulsarService(config);
 
         secondaryHost = String.format("%s:%d", "localhost", 
pulsar2.getListenPortHTTP().get());
+        secondaryTlsHost = String.format("%s:%d", "localhost", 
pulsar2.getListenPortHTTPS().get());
         url2 = new URL(pulsar2.getWebServiceAddress());
         admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
 
@@ -430,9 +443,9 @@ public class ModularLoadManagerImplTest {
         pulsar1.getConfiguration().setLoadBalancerEnabled(true);
         final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, 
"loadData");
         final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
-        final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
+        final BrokerData brokerDataSpy1 = 
spy(brokerDataMap.get(primaryTlsHost));
         when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
-        brokerDataMap.put(primaryHost, brokerDataSpy1);
+        brokerDataMap.put(primaryTlsHost, brokerDataSpy1);
         // Need to update all the bundle data for the shredder to see the spy.
         primaryLoadManagerSpy.handleDataNotification(new 
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/broker:8080"));
 
@@ -450,7 +463,7 @@ public class ModularLoadManagerImplTest {
         verify(namespacesSpy1, Mockito.times(1))
                 .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(2));
-        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
+        assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
 
         primaryLoadManagerSpy.doLoadShedding();
         // Now less expensive bundle will be unloaded (normally other bundle 
would move off and nothing would be
@@ -458,13 +471,13 @@ public class ModularLoadManagerImplTest {
         verify(namespacesSpy1, Mockito.times(2))
                 .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
         assertEquals(bundleReference.get(), mockBundleName(1));
-        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
+        assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
 
         primaryLoadManagerSpy.doLoadShedding();
         // Now both are in grace period: neither should be unloaded.
         verify(namespacesSpy1, Mockito.times(2))
                 .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
-        assertEquals(selectedBrokerRef.get().get(), secondaryHost);
+        assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);
 
         // Test bundle transfer to same broker
 
@@ -477,7 +490,7 @@ public class ModularLoadManagerImplTest {
         loadData.getRecentlyUnloadedBundles().clear();
         primaryLoadManagerSpy.doLoadShedding();
         // The bundle shouldn't be unloaded because the broker is the same.
-        verify(namespacesSpy1, Mockito.times(3))
+        verify(namespacesSpy1, Mockito.times(4))
                 .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
 
     }
@@ -704,7 +717,7 @@ public class ModularLoadManagerImplTest {
         admin1.namespaces().createNamespace(namespace);
 
         @Cleanup
-        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build();
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://" + namespace + "/my-topic1")
                 .create();
         ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) 
((ModularLoadManagerWrapper) pulsar1
@@ -895,6 +908,10 @@ public class ModularLoadManagerImplTest {
         String topicToFindBundle = topicName + 0;
         NamespaceBundle bundleWillBeSplit = 
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
 
+        final Optional<ResourceUnit> leastLoaded = 
loadManagerWrapper.getLeastLoaded(bundleWillBeSplit);
+        assertFalse(leastLoaded.isEmpty());
+        assertTrue(leastLoaded.get().getResourceId().startsWith("https"));
+
         String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/" 
+ tenant + "/" + namespace;
         CompletableFuture<List<String>> children = 
bundlesCache.getChildren(bundleDataPath);
         List<String> bundles = children.join();

Reply via email to