This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch 
lh-support-webservice-bindaddresses-and-advertised-listeners
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 60e5a4cce161958e79f2bc23bb31a5677b69154a
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 20 19:31:22 2026 +0300

    Address review comment
---
 .../apache/pulsar/broker/lookup/LookupResult.java  | 27 ++++++++++++++--------
 .../broker/namespace/NamespaceEphemeralData.java   |  6 +++++
 .../pulsar/broker/namespace/NamespaceService.java  | 17 +++++++-------
 .../pulsar/broker/web/RestProducerContext.java     |  2 +-
 .../pulsar/broker/lookup/LookupResultTest.java     |  6 ++++-
 5 files changed, 38 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
index 3d46ef16cda..c45614a8be3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
@@ -22,6 +22,8 @@ import static 
org.apache.pulsar.broker.lookup.v2.TopicLookup.LISTENERNAME_PARAM;
 import java.net.URI;
 import java.util.Map;
 import java.util.Objects;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
 import lombok.Builder;
 import lombok.Getter;
@@ -176,14 +178,14 @@ public class LookupResult {
             }
         }
 
-        // for backwards compatibility, parse the brokerId from webServiceUrl 
or webServiceUrlTls
-        // this might be the case temporarily when the broker upgrade happens 
and there are mixed versions
-        // of brokers in the cluster
+        // for backwards compatibility, derive the brokerId from webServiceUrl 
or webServiceUrlTls by
+        // parsing the URL and taking host:port. This is a transient state 
that may occur during a
+        // rolling upgrade when older brokers in the cluster do not yet 
publish a brokerId in their
+        // ephemeral data; once all brokers have been upgraded, the brokerId 
field is always populated.
         if (brokerId == null && (webServiceUrl != null || webServiceUrlTls != 
null)) {
-            if (webServiceUrl != null) {
-                brokerId = webServiceUrl.substring("http://".length());
-            } else {
-                brokerId = webServiceUrlTls.substring("https://".length());
+            URI url = URI.create(webServiceUrl != null ? webServiceUrl : 
webServiceUrlTls);
+            if (url.getHost() != null && url.getPort() != -1) {
+                brokerId = url.getHost() + ":" + url.getPort();
             }
         }
 
@@ -246,9 +248,14 @@ public class LookupResult {
     private URI toRedirectUriInternal(URI requestUri, boolean 
authoritativeRedirect) {
         boolean requireHttps = 
"https".equalsIgnoreCase(requestUri.getScheme());
         String webServiceUrl = requireHttps ? lookupData.getHttpUrlTls() : 
lookupData.getHttpUrl();
-        Objects.requireNonNull(webServiceUrl, () -> "No "
-                + (requireHttps ? "https" : "http")
-                + " URL configured for broker " + lookupData.getBrokerId());
+        if (webServiceUrl == null) {
+            // Preserve the legacy 412 error semantics when the redirect 
target broker has no URL
+            // configured for the requested scheme.
+            throw new 
WebApplicationException(Response.status(Response.Status.PRECONDITION_FAILED)
+                    .entity("No " + (requireHttps ? "https" : "http")
+                            + " URL configured for broker " + 
lookupData.getBrokerId())
+                    .build());
+        }
         URI webServiceUri = URI.create(webServiceUrl);
         UriBuilder uriBuilder =
                 UriBuilder.fromUri(requestUri) // use the path and query 
parameters from the request URI
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
index 4b4cb53ff0a..a580eb1314b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.jspecify.annotations.NonNull;
@@ -29,6 +30,11 @@ import org.jspecify.annotations.NonNull;
 @Data
 @NoArgsConstructor
 public class NamespaceEphemeralData {
+    // The brokerId field was added later. During a rolling upgrade, entries 
written by older brokers may not
+    // contain it (LookupResult derives it from the URL host:port in that 
case). Exclude it from equals/hashCode
+    // so that consumers of this object do not see spurious "changed" 
notifications when the same logical owner
+    // transitions from a missing brokerId to a populated one.
+    @EqualsAndHashCode.Exclude
     private String brokerId;
     private String nativeUrl;
     private String nativeUrlTls;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index b1d97a6a9cc..db6ce90c203 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -328,11 +328,14 @@ public class NamespaceService implements AutoCloseable {
     }
 
     /**
-     * Return the LookupResult of the broker who's owning a particular service 
unit.
-     * The LookupResult won't necessary be the broker who is owning the 
service unit. When the cluster contains
-     * multiple brokers with different load manager implementations, the 
LookupResult will be
-     * <p>
-     * If the service unit is not owned, return an empty optional
+     * Return the LookupResult of the broker that owns a particular service 
unit.
+     *
+     * <p>The returned LookupResult will not necessarily point to the broker 
that currently owns the service unit.
+     * When the cluster contains multiple brokers with different load manager 
implementations (e.g. during a
+     * load-manager migration), the LookupResult may point to a broker running 
the expected load manager, so the
+     * caller redirects the request to a broker that can handle it.
+     *
+     * <p>If the service unit is not owned, return an empty optional.
      */
     public Optional<LookupResult> getLookupResultForWebRequest(ServiceUnitId 
suName, LookupOptions options)
             throws Exception {
@@ -573,9 +576,7 @@ public class NamespaceService implements AutoCloseable {
                     .get(config.getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
 
             if (candidateBroker == null) {
-                Optional<LeaderBroker> currentLeader = 
pulsar.getLeaderElectionService().getCurrentLeader()
-                        // make a copy to avoid races, lookups will tolerate 
stale leader information
-                        .map(Function.identity());
+                Optional<LeaderBroker> currentLeader = 
pulsar.getLeaderElectionService().getCurrentLeader();
 
                 if (options.isAuthoritative()) {
                     // leader broker already assigned the current broker as 
owner
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java
index 6b9192845ab..488a8be2110 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java
@@ -52,7 +52,7 @@ public class RestProducerContext implements 
TopicEventsListener {
     public void handleEvent(String topicName, TopicEvent event, EventStage 
stage, Throwable t) {
         // remove topic from owning topics when it's unloaded
         if (event == TopicEvent.UNLOAD && stage == EventStage.SUCCESS) {
-            addOrRemoveTopic(topicName, event == TopicEvent.UNLOAD);
+            addOrRemoveTopic(topicName, true);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
index 553186e24eb..917bd1a3128 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
@@ -25,10 +25,13 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
@@ -370,7 +373,8 @@ public class LookupResultTest {
 
         URI request = URI.create("https://original-host:1234/admin";);
 
-        assertThrows(NullPointerException.class, () -> 
result.toRedirectUri(request));
+        WebApplicationException e = 
expectThrows(WebApplicationException.class, () -> 
result.toRedirectUri(request));
+        assertEquals(e.getResponse().getStatus(), 
Response.Status.PRECONDITION_FAILED.getStatusCode());
     }
 
     @Test

Reply via email to