This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f8067b50c0d [fix][broker] Fix returns wrong webServiceUrl when both 
webServicePort and webServicePortTls are set (#21633)
f8067b50c0d is described below

commit f8067b50c0d68cb723e5e5cd681b1697329ae012
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Dec 5 20:18:53 2023 +0800

    [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and 
webServicePortTls are set (#21633)
    
    Co-authored-by: Jiwe Guo <[email protected]>
---
 .../pulsar/broker/loadbalance/NoopLoadManager.java |  2 +-
 .../loadbalance/extensions/BrokerRegistryImpl.java |  2 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  4 +-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  4 +-
 .../pulsar/broker/namespace/OwnershipCache.java    |  6 +--
 .../pulsar/broker/web/PulsarWebResource.java       |  2 +-
 .../loadbalance/SimpleLoadManagerImplTest.java     | 58 ++++++++++++++++++++--
 7 files changed, 63 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 0de2ae92db6..80f887d394d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -61,7 +61,7 @@ public class NoopLoadManager implements LoadManager {
         localResourceUnit = new SimpleResourceUnit(String.format("http://%s";, 
lookupServiceAddress),
                 new PulsarResourceDescription());
 
-        LocalBrokerData localData = new 
LocalBrokerData(pulsar.getSafeWebServiceAddress(),
+        LocalBrokerData localData = new 
LocalBrokerData(pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), 
pulsar.getAdvertisedListeners());
         localData.setProtocols(pulsar.getProtocolDataToAdvertise());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 921ce35b5c6..bfdaa078f19 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -84,7 +84,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         this.listeners = new ArrayList<>();
         this.brokerId = pulsar.getLookupServiceAddress();
         this.brokerLookupData = new BrokerLookupData(
-                pulsar.getSafeWebServiceAddress(),
+                pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(),
                 pulsar.getBrokerServiceUrlTls(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 49c58afa3b9..4ecdfefbdd0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -961,14 +961,14 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
             // At this point, the ports will be updated with the real port 
number that the server was assigned
             Map<String, String> protocolData = 
pulsar.getProtocolDataToAdvertise();
 
-            lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
+            lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
                     pulsar.getBrokerServiceUrl(), 
pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
             lastData.setProtocols(protocolData);
             // configure broker-topic mode
             
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
             
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
 
-            localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
+            localData = new LocalBrokerData(pulsar.getWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
                     pulsar.getBrokerServiceUrl(), 
pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
             localData.setProtocols(protocolData);
             localData.setBrokerVersionString(pulsar.getBrokerVersion());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index ce46bd932f1..ee60595a485 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -234,7 +234,7 @@ public class SimpleLoadManagerImpl implements LoadManager, 
Consumer<Notification
             brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
         }
         this.policies = new SimpleResourceAllocationPolicies(pulsar);
-        lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
+        lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
         
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
@@ -1072,7 +1072,7 @@ public class SimpleLoadManagerImpl implements 
LoadManager, Consumer<Notification
     private LoadReport generateLoadReportForcefully() throws Exception {
         synchronized (bundleGainsCache) {
             try {
-                LoadReport loadReport = new 
LoadReport(pulsar.getSafeWebServiceAddress(),
+                LoadReport loadReport = new 
LoadReport(pulsar.getWebServiceAddress(),
                         pulsar.getWebServiceAddressTls(), 
pulsar.getBrokerServiceUrl(),
                         pulsar.getBrokerServiceUrlTls());
                 loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 86003153714..0033abf36c7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -122,10 +122,10 @@ public class OwnershipCache {
         this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
         this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
         this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, 
ownerBrokerUrlTls,
-                pulsar.getSafeWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
+                pulsar.getWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
                 false, pulsar.getAdvertisedListeners());
         this.selfOwnerInfoDisabled = new 
NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
-                pulsar.getSafeWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
+                pulsar.getWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
                 true, pulsar.getAdvertisedListeners());
         this.lockManager = 
pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
         this.locallyAcquiredLocks = new ConcurrentHashMap<>();
@@ -336,7 +336,7 @@ public class OwnershipCache {
 
     public synchronized boolean refreshSelfOwnerInfo() {
         this.selfOwnerInfo = new 
NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
-                pulsar.getBrokerServiceUrlTls(), 
pulsar.getSafeWebServiceAddress(),
+                pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(), false, 
pulsar.getAdvertisedListeners());
         return selfOwnerInfo.getNativeUrl() != null || 
selfOwnerInfo.getNativeUrlTls() != null;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5602f662f50..e8192cde3fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -1205,7 +1205,7 @@ public abstract class PulsarWebResource {
     protected void validateBrokerName(String broker) {
         String brokerUrl = String.format("http://%s";, broker);
         String brokerUrlTls = String.format("https://%s";, broker);
-        if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
+        if (!brokerUrl.equals(pulsar().getWebServiceAddress())
                 && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
             String[] parts = broker.split(":");
             checkArgument(parts.length == 2, String.format("Invalid broker url 
%s", broker));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 7f2767b2e77..43706129fbe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,12 +56,16 @@ import 
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -72,6 +76,7 @@ import 
org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -93,8 +98,14 @@ public class SimpleLoadManagerImplTest {
     BrokerStats brokerStatsClient2;
 
     String primaryHost;
+
+    String primaryTlsHost;
+
     String secondaryHost;
 
+    private String defaultNamespace;
+    private String defaultTenant;
+
     ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, 
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
 
     @BeforeMethod
@@ -108,6 +119,7 @@ public class SimpleLoadManagerImplTest {
         ServiceConfiguration config1 = new ServiceConfiguration();
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
+        config1.setWebServicePortTls(Optional.of(0));
         config1.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -122,11 +134,13 @@ public class SimpleLoadManagerImplTest {
         admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
         brokerStatsClient1 = admin1.brokerStats();
         primaryHost = pulsar1.getWebServiceAddress();
+        primaryTlsHost = pulsar1.getWebServiceAddressTls();
 
         // Start broker 2
         ServiceConfiguration config2 = new ServiceConfiguration();
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
+        config2.setWebServicePortTls(Optional.of(0));
         config2.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -143,6 +157,8 @@ public class SimpleLoadManagerImplTest {
         brokerStatsClient2 = admin2.brokerStats();
         secondaryHost = pulsar2.getWebServiceAddress();
         Thread.sleep(100);
+
+        setupClusters();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -256,10 +272,9 @@ public class SimpleLoadManagerImplTest {
         sortedRankingsInstance.get().put(lr.getRank(rd), rus);
         setObjectField(SimpleLoadManagerImpl.class, loadManager, 
"sortedRankings", sortedRankingsInstance);
 
-        ResourceUnit found = loadManager
-                
.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
+        final Optional<ResourceUnit> leastLoaded = 
loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
         // broker is not active so found should be null
-        assertNotEquals(found, null, "did not find a broker when expected one 
to be found");
+        assertFalse(leastLoaded.isPresent());
 
     }
 
@@ -399,7 +414,7 @@ public class SimpleLoadManagerImplTest {
         final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) 
pulsar1.getLoadManager().get();
 
         for (final NamespaceBundle bundle : bundles) {
-            if 
(loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) {
+            if 
(loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost)))
 {
                 ++numAssignedToPrimary;
             } else {
                 ++numAssignedToSecondary;
@@ -411,6 +426,10 @@ public class SimpleLoadManagerImplTest {
         }
     }
 
+    private static String getAddress(String url) {
+        return url.replaceAll("https", "http");
+    }
+
     @Test
     public void testNamespaceBundleStats() {
         NamespaceBundleStats nsb1 = new NamespaceBundleStats();
@@ -479,4 +498,33 @@ public class SimpleLoadManagerImplTest {
         assertEquals(usage.getBandwidthIn().usage, usageLimit);
     }
 
+    @Test
+    public void testGetWebSerUrl() throws PulsarAdminException {
+        String webServiceUrl = 
admin1.brokerStats().getLoadReport().getWebServiceUrl();
+        Assert.assertEquals(webServiceUrl, pulsar1.getWebServiceAddress());
+
+        String webServiceUrl2 = 
admin2.brokerStats().getLoadReport().getWebServiceUrl();
+        Assert.assertEquals(webServiceUrl2, pulsar2.getWebServiceAddress());
+    }
+
+    @Test
+    public void testRedirectOwner() throws PulsarAdminException {
+        final String topicName = "persistent://" + defaultNamespace + "/" + 
"test-topic";
+        admin1.topics().createNonPartitionedTopic(topicName);
+        TopicStats stats = admin1.topics().getStats(topicName);
+        Assert.assertNotNull(stats);
+
+        TopicStats stats2 = admin2.topics().getStats(topicName);
+        Assert.assertNotNull(stats2);
+    }
+
+    private void setupClusters() throws PulsarAdminException {
+        admin1.clusters().createCluster("use", 
ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("use"));
+        defaultTenant = "prop-xyz";
+        admin1.tenants().createTenant(defaultTenant, tenantInfo);
+        defaultNamespace = defaultTenant + "/ns1";
+        admin1.namespaces().createNamespace(defaultNamespace, Set.of("use"));
+    }
+
 }

Reply via email to