This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 823f1fc81c7884781e9adf8a7ccb1601fb3531dc Author: feynmanlin <[email protected]> AuthorDate: Wed Jan 13 18:27:50 2021 +0800 Fix admin-api-brokers list failed (#9191) Fixes #9128 ### Motivation There have bug in parses cluster service url. Current address resolution does not support multiple addresses. Such as `http://host1:8080,host2:8080,host3:8080` ### Modifications Let URI resolution support multiple addresses ### Verifying this change unit test: activeBrokerParse (cherry picked from commit 908dae990213db9bb6922257a451a548d6ecf1ef) --- .../apache/pulsar/broker/web/PulsarWebResource.java | 21 ++++++++++++++------- .../pulsar/broker/service/ReplicatorTest.java | 20 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 677d81e..74176a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -55,6 +55,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundles; @@ -368,14 +370,19 @@ public abstract class PulsarWebResource { } private URI getRedirectionUrl(ClusterData differentClusterData) throws MalformedURLException { - URL webUrl = null; - if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent() - && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) { - webUrl = new URL(differentClusterData.getServiceUrlTls()); - } else { - webUrl = new URL(differentClusterData.getServiceUrl()); + try { + PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver(); + if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent() + && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) { + serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls()); + } else { + serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl()); + } + URL webUrl = new URL(serviceNameResolver.resolveHostUri().toString()); + return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build(); + } catch (PulsarClientException.InvalidServiceURL exception) { + throw new MalformedURLException(exception.getMessage()); } - return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build(); } protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster(PulsarService pulsar, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index b1580d1..41d085b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -69,9 +69,11 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,6 +200,24 @@ public class ReplicatorTest extends ReplicatorTestBase { // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters } + @Test(timeOut = 10000) + public void activeBrokerParse() throws Exception { + pulsar1.getConfiguration().setAuthorizationEnabled(true); + //init clusterData + ClusterData cluster2Data = new ClusterData(); + String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678", pulsar2.getWebServiceAddress()); + cluster2Data.setServiceUrl(cluster2ServiceUrls); + String cluster2 = "activeCLuster2"; + admin2.clusters().createCluster(cluster2, cluster2Data); + Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() + -> admin2.clusters().getCluster(cluster2) != null); + + List<String> list = admin1.brokers().getActiveBrokers(cluster2); + assertEquals(list.get(0), url2.toString().replace("http://", "")); + //restore configuration + pulsar1.getConfiguration().setAuthorizationEnabled(false); + } + @SuppressWarnings("unchecked") @Test(timeOut = 30000) public void testConcurrentReplicator() throws Exception {
