This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5e0b424af2a3e7bae1c6f36890fe2b850fa0ed7f Author: Lari Hotari <[email protected]> AuthorDate: Thu Jan 25 11:36:17 2024 +0200 Revert "[fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633)" This reverts commit 91e073d1421932c2894e937e6b9fd2fada38be02. --- .../pulsar/broker/loadbalance/NoopLoadManager.java | 2 +- .../loadbalance/extensions/BrokerRegistryImpl.java | 2 +- .../loadbalance/impl/ModularLoadManagerImpl.java | 4 +- .../loadbalance/impl/SimpleLoadManagerImpl.java | 4 +- .../pulsar/broker/namespace/OwnershipCache.java | 6 +-- .../pulsar/broker/web/PulsarWebResource.java | 2 +- .../loadbalance/SimpleLoadManagerImplTest.java | 58 ++-------------------- 7 files changed, 15 insertions(+), 63 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index 80f887d394d..0de2ae92db6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -61,7 +61,7 @@ public class NoopLoadManager implements LoadManager { localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress), new PulsarResourceDescription()); - LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(), + LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners()); localData.setProtocols(pulsar.getProtocolDataToAdvertise()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index bfdaa078f19..921ce35b5c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -84,7 +84,7 @@ public class BrokerRegistryImpl implements BrokerRegistry { this.listeners = new ArrayList<>(); this.brokerId = pulsar.getLookupServiceAddress(); this.brokerLookupData = new BrokerLookupData( - pulsar.getWebServiceAddress(), + pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 14dde0cc81e..022f2fcbe39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -978,14 +978,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // At this point, the ports will be updated with the real port number that the server was assigned Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise(); - lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), + lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners()); lastData.setProtocols(protocolData); // configure broker-topic mode lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); - localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), + localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners()); localData.setProtocols(protocolData); localData.setBrokerVersionString(pulsar.getBrokerVersion()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index d54579a2861..5e994569711 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -234,7 +234,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar); } this.policies = new SimpleResourceAllocationPolicies(pulsar); - lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), + lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise()); lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); @@ -1072,7 +1072,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification private LoadReport generateLoadReportForcefully() throws Exception { synchronized (bundleGainsCache) { try { - LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), + LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); loadReport.setProtocols(pulsar.getProtocolDataToAdvertise()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 0033abf36c7..86003153714 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -122,10 +122,10 @@ public class OwnershipCache { this.ownerBrokerUrl = pulsar.getBrokerServiceUrl(); this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls(); this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, - pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), + pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners()); this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, - pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), + pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners()); this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class); this.locallyAcquiredLocks = new ConcurrentHashMap<>(); @@ -336,7 +336,7 @@ public class OwnershipCache { public synchronized boolean refreshSelfOwnerInfo() { this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(), - pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(), + pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners()); return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 35203e014bb..fa121b8eb4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1201,7 +1201,7 @@ public abstract class PulsarWebResource { protected void validateBrokerName(String broker) { String brokerUrl = String.format("http://%s", broker); String brokerUrlTls = String.format("https://%s", broker); - if (!brokerUrl.equals(pulsar().getWebServiceAddress()) + if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress()) && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) { String[] parts = broker.split(":"); checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index cf932ce5b60..c4898786e3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; @@ -55,16 +55,12 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; -import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.ResourceQuota; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; @@ -75,7 +71,6 @@ import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -97,14 +92,8 @@ public class SimpleLoadManagerImplTest { BrokerStats brokerStatsClient2; String primaryHost; - - String primaryTlsHost; - String secondaryHost; - private String defaultNamespace; - private String defaultTenant; - ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); @BeforeMethod @@ -118,7 +107,6 @@ public class SimpleLoadManagerImplTest { ServiceConfiguration config1 = new ServiceConfiguration(); config1.setClusterName("use"); config1.setWebServicePort(Optional.of(0)); - config1.setWebServicePortTls(Optional.of(0)); config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config1.setBrokerShutdownTimeoutMs(0L); config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); @@ -134,13 +122,11 @@ public class SimpleLoadManagerImplTest { admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); brokerStatsClient1 = admin1.brokerStats(); primaryHost = pulsar1.getWebServiceAddress(); - primaryTlsHost = pulsar1.getWebServiceAddressTls(); // Start broker 2 ServiceConfiguration config2 = new ServiceConfiguration(); config2.setClusterName("use"); config2.setWebServicePort(Optional.of(0)); - config2.setWebServicePortTls(Optional.of(0)); config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); config2.setBrokerShutdownTimeoutMs(0L); config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); @@ -157,8 +143,6 @@ public class SimpleLoadManagerImplTest { brokerStatsClient2 = admin2.brokerStats(); secondaryHost = pulsar2.getWebServiceAddress(); Thread.sleep(100); - - setupClusters(); } @AfterMethod(alwaysRun = true) @@ -270,9 +254,10 @@ public class SimpleLoadManagerImplTest { sortedRankingsInstance.get().put(lr.getRank(rd), rus); setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance); - final Optional<ResourceUnit> leastLoaded = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")); + ResourceUnit found = loadManager + .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get(); // broker is not active so found should be null - assertFalse(leastLoaded.isPresent()); + assertNotEquals(found, null, "did not find a broker when expected one to be found"); } @@ -410,7 +395,7 @@ public class SimpleLoadManagerImplTest { final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get(); for (final NamespaceBundle bundle : bundles) { - if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost))) { + if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) { ++numAssignedToPrimary; } else { ++numAssignedToSecondary; @@ -422,10 +407,6 @@ public class SimpleLoadManagerImplTest { } } - private static String getAddress(String url) { - return url.replaceAll("https", "http"); - } - @Test public void testNamespaceBundleStats() { NamespaceBundleStats nsb1 = new NamespaceBundleStats(); @@ -494,33 +475,4 @@ public class SimpleLoadManagerImplTest { assertEquals(usage.getBandwidthIn().usage, usageLimit); } - @Test - public void testGetWebSerUrl() throws PulsarAdminException { - String webServiceUrl = admin1.brokerStats().getLoadReport().getWebServiceUrl(); - Assert.assertEquals(webServiceUrl, pulsar1.getWebServiceAddress()); - - String webServiceUrl2 = admin2.brokerStats().getLoadReport().getWebServiceUrl(); - Assert.assertEquals(webServiceUrl2, pulsar2.getWebServiceAddress()); - } - - @Test - public void testRedirectOwner() throws PulsarAdminException { - final String topicName = "persistent://" + defaultNamespace + "/" + "test-topic"; - admin1.topics().createNonPartitionedTopic(topicName); - TopicStats stats = admin1.topics().getStats(topicName); - Assert.assertNotNull(stats); - - TopicStats stats2 = admin2.topics().getStats(topicName); - Assert.assertNotNull(stats2); - } - - private void setupClusters() throws PulsarAdminException { - admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build()); - TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use")); - defaultTenant = "prop-xyz"; - admin1.tenants().createTenant(defaultTenant, tenantInfo); - defaultNamespace = defaultTenant + "/ns1"; - admin1.namespaces().createNamespace(defaultNamespace, Set.of("use")); - } - }
