This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1fc5cf1495417bc89f650b816c0906b9a1103697 Author: Cong Zhao <[email protected]> AuthorDate: Tue Dec 5 20:18:53 2023 +0800 [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633) Co-authored-by: Jiwe Guo <[email protected]> (cherry picked from commit f8067b50c0d68cb723e5e5cd681b1697329ae012) --- .../pulsar/broker/loadbalance/NoopLoadManager.java | 2 +- .../loadbalance/extensions/BrokerRegistryImpl.java | 2 +- .../loadbalance/impl/ModularLoadManagerImpl.java | 4 +- .../loadbalance/impl/SimpleLoadManagerImpl.java | 4 +- .../pulsar/broker/namespace/OwnershipCache.java | 6 +-- .../pulsar/broker/web/PulsarWebResource.java | 2 +- .../loadbalance/SimpleLoadManagerImplTest.java | 58 ++++++++++++++++++++-- 7 files changed, 63 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index 0de2ae92db6..80f887d394d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -61,7 +61,7 @@ public class NoopLoadManager implements LoadManager { localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress), new PulsarResourceDescription()); - LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), + LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners()); localData.setProtocols(pulsar.getProtocolDataToAdvertise()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 921ce35b5c6..bfdaa078f19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -84,7 +84,7 @@ public class BrokerRegistryImpl implements BrokerRegistry { this.listeners = new ArrayList<>(); this.brokerId = pulsar.getLookupServiceAddress(); this.brokerLookupData = new BrokerLookupData( - pulsar.getSafeWebServiceAddress(), + pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index b664b1ca2ba..255d7aa77af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -980,14 +980,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // At this point, the ports will be updated with the real port number that the server was assigned Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise(); - lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), + lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners()); lastData.setProtocols(protocolData); // configure broker-topic mode lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); - localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), + localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners()); localData.setProtocols(protocolData); localData.setBrokerVersionString(pulsar.getBrokerVersion()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 5e994569711..d54579a2861 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -234,7 +234,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar); } this.policies = new SimpleResourceAllocationPolicies(pulsar); - lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), + lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise()); lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); @@ -1072,7 +1072,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification private LoadReport generateLoadReportForcefully() throws Exception { synchronized (bundleGainsCache) { try { - LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), + LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); loadReport.setProtocols(pulsar.getProtocolDataToAdvertise()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 86003153714..0033abf36c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -122,10 +122,10 @@ public class OwnershipCache { this.ownerBrokerUrl = pulsar.getBrokerServiceUrl(); this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls(); this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, - pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), + pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners()); this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, - pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), + pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners()); this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class); this.locallyAcquiredLocks = new ConcurrentHashMap<>(); @@ -336,7 +336,7 @@ public class OwnershipCache { public synchronized boolean refreshSelfOwnerInfo() { this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(), - pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(), + pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners()); return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index bdb052d7b9e..de1296dbdd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1205,7 +1205,7 @@ public abstract class PulsarWebResource { protected void validateBrokerName(String broker) { String brokerUrl = String.format("http://%s", broker); String brokerUrlTls = String.format("https://%s", broker); - if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress()) + if (!brokerUrl.equals(pulsar().getWebServiceAddress()) && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) { String[] parts = broker.split(":"); checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 6303c70b4dc..820d966f62d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; @@ -55,12 +55,16 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.ResourceQuota; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; @@ -71,6 +75,7 @@ import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -92,8 +97,14 @@ public class SimpleLoadManagerImplTest { BrokerStats brokerStatsClient2; String primaryHost; + + String primaryTlsHost; + String secondaryHost; + private String defaultNamespace; + private String defaultTenant; + ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); @BeforeMethod @@ -107,6 +118,7 @@ public class SimpleLoadManagerImplTest { ServiceConfiguration config1 = new ServiceConfiguration(); config1.setClusterName("use"); config1.setWebServicePort(Optional.of(0)); + config1.setWebServicePortTls(Optional.of(0)); config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config1.setBrokerShutdownTimeoutMs(0L); config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); @@ -121,11 +133,13 @@ public class SimpleLoadManagerImplTest { admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); brokerStatsClient1 = admin1.brokerStats(); primaryHost = pulsar1.getWebServiceAddress(); + primaryTlsHost = pulsar1.getWebServiceAddressTls(); // Start broker 2 ServiceConfiguration config2 = new ServiceConfiguration(); config2.setClusterName("use"); config2.setWebServicePort(Optional.of(0)); + config2.setWebServicePortTls(Optional.of(0)); config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config2.setBrokerShutdownTimeoutMs(0L); config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); @@ -142,6 +156,8 @@ public class SimpleLoadManagerImplTest { brokerStatsClient2 = admin2.brokerStats(); secondaryHost = pulsar2.getWebServiceAddress(); Thread.sleep(100); + + setupClusters(); } @AfterMethod(alwaysRun = true) @@ -253,10 +269,9 @@ public class SimpleLoadManagerImplTest { sortedRankingsInstance.get().put(lr.getRank(rd), rus); setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance); - ResourceUnit found = loadManager - .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get(); + final Optional<ResourceUnit> leastLoaded = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")); // broker is not active so found should be null - assertNotEquals(found, null, "did not find a broker when expected one to be found"); + assertFalse(leastLoaded.isPresent()); } @@ -394,7 +409,7 @@ public class SimpleLoadManagerImplTest { final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get(); for (final NamespaceBundle bundle : bundles) { - if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) { + if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost))) { ++numAssignedToPrimary; } else { ++numAssignedToSecondary; @@ -406,6 +421,10 @@ public class SimpleLoadManagerImplTest { } } + private static String getAddress(String url) { + return url.replaceAll("https", "http"); + } + @Test public void testNamespaceBundleStats() { NamespaceBundleStats nsb1 = new NamespaceBundleStats(); @@ -474,4 +493,33 @@ public class SimpleLoadManagerImplTest { assertEquals(usage.getBandwidthIn().usage, usageLimit); } + @Test + public void testGetWebSerUrl() throws PulsarAdminException { + String webServiceUrl = admin1.brokerStats().getLoadReport().getWebServiceUrl(); + Assert.assertEquals(webServiceUrl, pulsar1.getWebServiceAddress()); + + String webServiceUrl2 = admin2.brokerStats().getLoadReport().getWebServiceUrl(); + Assert.assertEquals(webServiceUrl2, pulsar2.getWebServiceAddress()); + } + + @Test + public void testRedirectOwner() throws PulsarAdminException { + final String topicName = "persistent://" + defaultNamespace + "/" + "test-topic"; + admin1.topics().createNonPartitionedTopic(topicName); + TopicStats stats = admin1.topics().getStats(topicName); + Assert.assertNotNull(stats); + + TopicStats stats2 = admin2.topics().getStats(topicName); + Assert.assertNotNull(stats2); + } + + private void setupClusters() throws PulsarAdminException { + admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use")); + defaultTenant = "prop-xyz"; + admin1.tenants().createTenant(defaultTenant, tenantInfo); + defaultNamespace = defaultTenant + "/ns1"; + admin1.namespaces().createNamespace(defaultNamespace, Set.of("use")); + } + }
