This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f9af4245e0b [fix] [broker] Incorrect service name selection logic
(#19505)
f9af4245e0b is described below
commit f9af4245e0b05c382656fc674fdaeda26487258c
Author: fengyubiao <[email protected]>
AuthorDate: Wed Feb 15 23:02:30 2023 +0800
[fix] [broker] Incorrect service name selection logic (#19505)
When calling the method `PulsarWebResource.getRedirectionUrl`, reuse the
same `PulsarServiceNameResolver` instance.
---
.../pulsar/broker/web/PulsarWebResource.java | 31 ++++++++++++++++++----
.../pulsar/broker/service/ReplicatorTest.java | 3 ++-
.../client/impl/PulsarServiceNameResolver.java | 5 ++--
3 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index fb80a3e7983..82246ad6494 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,12 +21,16 @@ package org.apache.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -86,6 +90,8 @@ import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +102,17 @@ public abstract class PulsarWebResource {
private static final Logger log =
LoggerFactory.getLogger(PulsarWebResource.class);
+ private static final LoadingCache<String, PulsarServiceNameResolver>
SERVICE_NAME_RESOLVER_CACHE =
+
Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(5)).build(
+ new CacheLoader<>() {
+ @Override
+ public @Nullable PulsarServiceNameResolver
load(@NonNull String serviceUrl) throws Exception {
+ PulsarServiceNameResolver serviceNameResolver =
new PulsarServiceNameResolver();
+ serviceNameResolver.updateServiceUrl(serviceUrl);
+ return serviceNameResolver;
+ }
+ });
+
static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
@Context
@@ -476,17 +493,21 @@ public abstract class PulsarWebResource {
private URI getRedirectionUrl(ClusterData differentClusterData) throws
MalformedURLException {
try {
- PulsarServiceNameResolver serviceNameResolver = new
PulsarServiceNameResolver();
+ PulsarServiceNameResolver serviceNameResolver;
if (isRequestHttps() &&
pulsar.getConfiguration().getWebServicePortTls().isPresent()
&&
StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
-
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
+ serviceNameResolver =
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrlTls());
} else {
-
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
+ serviceNameResolver =
SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrl());
}
URL webUrl = new
URL(serviceNameResolver.resolveHostUri().toString());
return
UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
- } catch (PulsarClientException.InvalidServiceURL exception) {
- throw new MalformedURLException(exception.getMessage());
+ } catch (Exception exception) {
+ if (exception.getCause() != null
+ && exception.getCause() instanceof
PulsarClientException.InvalidServiceURL) {
+ throw new MalformedURLException(exception.getMessage());
+ }
+ throw exception;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1c8c86c9434..ab4f6a5c7f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -233,7 +233,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
pulsar1.getConfiguration().setAuthorizationEnabled(true);
//init clusterData
- String cluster2ServiceUrls =
String.format("%s,localhost:1234,localhost:5678",
pulsar2.getWebServiceAddress());
+ String cluster2ServiceUrls =
String.format("%s,localhost:1234,localhost:5678,localhost:5677,localhost:5676",
+ pulsar2.getWebServiceAddress());
ClusterData cluster2Data =
ClusterData.builder().serviceUrl(cluster2ServiceUrls).build();
String cluster2 = "activeCLuster2";
admin2.clusters().createCluster(cluster2, cluster2Data);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 32f5aa4975c..e47750be462 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -52,9 +52,8 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
if (list.size() == 1) {
return list.get(0);
} else {
- CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) %
list.size());
- return list.get(currentIndex);
-
+ int originalIndex = CURRENT_INDEX_UPDATER.getAndUpdate(this, last
-> (last + 1) % list.size());
+ return list.get((originalIndex + 1) % list.size());
}
}