This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f9af4245e0b [fix] [broker] Incorrect service name selection logic 
(#19505)
f9af4245e0b is described below

commit f9af4245e0b05c382656fc674fdaeda26487258c
Author: fengyubiao <[email protected]>
AuthorDate: Wed Feb 15 23:02:30 2023 +0800

    [fix] [broker] Incorrect service name selection logic (#19505)
    
    When calling the method `PulsarWebResource.getRedirectionUrl`, reuse the 
same `PulsarServiceNameResolver` instance.
---
 .../pulsar/broker/web/PulsarWebResource.java       | 31 ++++++++++++++++++----
 .../pulsar/broker/service/ReplicatorTest.java      |  3 ++-
 .../client/impl/PulsarServiceNameResolver.java     |  5 ++--
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index fb80a3e7983..82246ad6494 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,12 +21,16 @@ package org.apache.pulsar.broker.web;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -86,6 +90,8 @@ import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.path.PolicyPath;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,6 +102,17 @@ public abstract class PulsarWebResource {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarWebResource.class);
 
+    private static final LoadingCache<String, PulsarServiceNameResolver> 
SERVICE_NAME_RESOLVER_CACHE =
+            
Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(5)).build(
+                    new CacheLoader<>() {
+                        @Override
+                        public @Nullable PulsarServiceNameResolver 
load(@NonNull String serviceUrl) throws Exception {
+                            PulsarServiceNameResolver serviceNameResolver = 
new PulsarServiceNameResolver();
+                            serviceNameResolver.updateServiceUrl(serviceUrl);
+                            return serviceNameResolver;
+                        }
+                    });
+
     static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
 
     @Context
@@ -476,17 +493,21 @@ public abstract class PulsarWebResource {
 
     private URI getRedirectionUrl(ClusterData differentClusterData) throws 
MalformedURLException {
         try {
-            PulsarServiceNameResolver serviceNameResolver = new 
PulsarServiceNameResolver();
+            PulsarServiceNameResolver serviceNameResolver;
             if (isRequestHttps() && 
pulsar.getConfiguration().getWebServicePortTls().isPresent()
                     && 
StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
-                
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
+                serviceNameResolver = 
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrlTls());
             } else {
-                
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
+                serviceNameResolver = 
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrl());
             }
             URL webUrl = new 
URL(serviceNameResolver.resolveHostUri().toString());
             return 
UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
-        } catch (PulsarClientException.InvalidServiceURL exception) {
-            throw new MalformedURLException(exception.getMessage());
+        } catch (Exception exception) {
+            if (exception.getCause() != null
+                    && exception.getCause() instanceof 
PulsarClientException.InvalidServiceURL) {
+                throw new MalformedURLException(exception.getMessage());
+            }
+            throw exception;
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1c8c86c9434..ab4f6a5c7f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -233,7 +233,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
         pulsar1.getConfiguration().setAuthorizationEnabled(true);
         //init clusterData
 
-        String cluster2ServiceUrls = 
String.format("%s,localhost:1234,localhost:5678", 
pulsar2.getWebServiceAddress());
+        String cluster2ServiceUrls = 
String.format("%s,localhost:1234,localhost:5678,localhost:5677,localhost:5676",
+                pulsar2.getWebServiceAddress());
         ClusterData cluster2Data = 
ClusterData.builder().serviceUrl(cluster2ServiceUrls).build();
         String cluster2 = "activeCLuster2";
         admin2.clusters().createCluster(cluster2, cluster2Data);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 32f5aa4975c..e47750be462 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -52,9 +52,8 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
         if (list.size() == 1) {
             return list.get(0);
         } else {
-            CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) % 
list.size());
-            return list.get(currentIndex);
-
+            int originalIndex = CURRENT_INDEX_UPDATER.getAndUpdate(this, last 
-> (last + 1) % list.size());
+            return list.get((originalIndex + 1) % list.size());
         }
     }
 

Reply via email to