This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch 
lh-support-webservice-bindaddresses-and-advertised-listeners
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 005ae15df341e734582ac62c4a16d93d9f103f96
Author: Lari Hotari <[email protected]>
AuthorDate: Thu May 21 13:06:29 2026 +0300

    [improve][broker] Tighten listener validation and redirect URI handling
    
    Address review feedback on PR #221:
    
    - Rename MultipleListenerValidator.validateAndAnalysisAdvertisedListener
      to validateAndUpdateAdvertisedListeners and document the in-place
      mutation of internalListenerName.
    - Reject listener names containing characters that would need URL
      encoding; validation is now consistent between advertisedListeners
      and bindAddresses.
    - Trim whitespace around comma-separated entries in advertisedListeners
      and bindAddresses; skip empty entries from extra/trailing commas.
    - Support IPv6 literals in advertisedListeners/bindAddresses and in
      ServiceConfigurationUtils URL builders by wrapping bare IPv6 in
      brackets via Netty NetUtil.
    - Document that port 0 in bindAddresses is supported only for the
      internal listener (broker.conf, standalone.conf, ServiceConfiguration).
    - Refactor LookupResult.internalCreate to split listener URL overrides
      into a small UrlOverride helper for readability.
    - Add toLookupRedirectUri that injects the listenerName query parameter
      for topic-lookup redirects only; admin redirects no longer rewrite it.
    - Improve the 412 entity in LookupResult.toRedirectUri to include the
      resolved listener name for easier debugging.
    - Tolerate a missing web-service listener on the target broker
      (rolling-upgrade safety) and fall back to its default URL.
    
    New tests: ServiceConfigurationUtilsTest, 
NamespaceServiceListenerResolutionTest,
    plus whitespace, IPv6, and listener-name validation cases in the existing
    validator/lookup test classes.
---
 conf/broker.conf                                   |   1 +
 conf/standalone.conf                               |   1 +
 pulsar-broker-common/build.gradle.kts              |   1 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   3 +-
 .../pulsar/broker/ServiceConfigurationUtils.java   |  43 ++++--
 .../broker/validator/BindAddressValidator.java     |  15 +-
 .../validator/MultipleListenerValidator.java       |  72 +++++++--
 .../broker/ServiceConfigurationUtilsTest.java      |  83 +++++++++++
 .../broker/validator/BindAddressValidatorTest.java |  23 +++
 .../validator/MultipleListenerValidatorTest.java   |  60 ++++++--
 .../org/apache/pulsar/broker/PulsarService.java    |   6 +-
 .../RedirectManagerForLoadManagerMigration.java    |   4 +-
 .../apache/pulsar/broker/lookup/LookupResult.java  | 161 +++++++++++++--------
 .../pulsar/broker/lookup/TopicLookupBase.java      |   4 +-
 .../pulsar/broker/namespace/NamespaceService.java  |  15 +-
 .../pulsar/broker/lookup/LookupResultTest.java     |  61 ++++++++
 .../NamespaceServiceListenerResolutionTest.java    |  90 ++++++++++++
 17 files changed, 528 insertions(+), 115 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 0038cb22571..8ed01cc8656 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -90,6 +90,7 @@ bindAddress=0.0.0.0
 # Each ip:port may be bound by exactly one (listener, scheme) pair. An entry 
that exactly
 # matches the auto-derived internal-listener binding is tolerated; assigning 
the same ip:port
 # to a different listener or scheme fails validation.
+# Port 0 (OS-assigned ephemeral port) is supported only for the internal 
listener.
 bindAddresses=
 
 # Hostname or IP advertised to clients for the internal listener.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 69011aeb77e..38c5d1a5b89 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -64,6 +64,7 @@ bindAddress=0.0.0.0
 # Each ip:port may be bound by exactly one (listener, scheme) pair. An entry 
that exactly
 # matches the auto-derived internal-listener binding is tolerated; assigning 
the same ip:port
 # to a different listener or scheme fails validation.
+# Port 0 (OS-assigned ephemeral port) is supported only for the internal 
listener.
 bindAddresses=
 
 # Hostname or IP advertised to clients for the internal listener.
diff --git a/pulsar-broker-common/build.gradle.kts 
b/pulsar-broker-common/build.gradle.kts
index 1fa8c0437c7..6e2bc73feba 100644
--- a/pulsar-broker-common/build.gradle.kts
+++ b/pulsar-broker-common/build.gradle.kts
@@ -27,6 +27,7 @@ dependencies {
     implementation(libs.slog)
     implementation(libs.guava)
     implementation(libs.commons.lang3)
+    implementation(libs.netty.common)
     implementation(libs.bookkeeper.server)
     implementation(libs.opentelemetry.api)
     implementation(libs.simpleclient)
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3829f6d062c..96f2908a74d 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -293,7 +293,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                     + " A local hostname is also accepted but not 
recommended.\n"
                     + " Each `ip:port` may be bound by exactly one (listener, 
scheme) pair. An entry"
                     + " that exactly matches the auto-derived 
internal-listener binding is tolerated;"
-                    + " assigning the same `ip:port` to a different listener 
or scheme fails validation.")
+                    + " assigning the same `ip:port` to a different listener 
or scheme fails validation.\n"
+                    + " Port `0` (OS-assigned ephemeral port) is supported 
only for the internal listener.")
     private String bindAddresses;
 
     @FieldContext(category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
index fb7d5d34706..893dc67425f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import io.netty.util.NetUtil;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
@@ -61,7 +62,7 @@ public class ServiceConfigurationUtils {
     public static String getAppliedAdvertisedAddress(ServiceConfiguration 
configuration,
                                                      boolean 
ignoreAdvertisedListener) {
         Map<String, AdvertisedListener> result = MultipleListenerValidator
-                .validateAndAnalysisAdvertisedListener(configuration);
+                .validateAndUpdateAdvertisedListeners(configuration);
 
         String advertisedAddress = configuration.getAdvertisedAddress();
         if (advertisedAddress != null) {
@@ -90,7 +91,7 @@ public class ServiceConfigurationUtils {
      */
     public static AdvertisedListener getInternalListener(ServiceConfiguration 
config, String protocol) {
         Map<String, AdvertisedListener> result = MultipleListenerValidator
-                .validateAndAnalysisAdvertisedListener(config);
+                .validateAndUpdateAdvertisedListeners(config);
         AdvertisedListener internal = 
result.get(config.getInternalListenerName());
         if (internal == null || !internal.hasUriForProtocol(protocol)) {
             // Search for an advertised listener for same protocol
@@ -114,8 +115,26 @@ public class ServiceConfigurationUtils {
         return internal;
     }
 
-    private static URI createUriOrNull(String scheme, String hostname, 
Optional<Integer> port) {
-        return port.map(p -> URI.create(String.format("%s://%s:%d", scheme, 
hostname, p))).orElse(null);
+    private static URI createUriOrNull(String scheme, String ipOrHost, 
Optional<Integer> port) {
+        return port.map(p -> URI.create(String.format("%s://%s:%d", scheme, 
formatHost(ipOrHost), p))).orElse(null);
+    }
+
+    /**
+     * Wrap bare IPv6 literals in square brackets so they parse correctly as 
the host component of a
+     * URL. IPv4 addresses and hostnames are returned unchanged, as is an IPv6 
literal that is
+     * already bracketed.
+     */
+    static String formatHost(String ipOrHost) {
+        if (ipOrHost == null || ipOrHost.isEmpty()) {
+            return ipOrHost;
+        }
+        if (ipOrHost.startsWith("[") && ipOrHost.endsWith("]")) {
+            return ipOrHost;
+        }
+        if (NetUtil.isValidIpV6Address(ipOrHost)) {
+            return "[" + ipOrHost + "]";
+        }
+        return ipOrHost;
     }
 
     private static URI firstNonNullUri(URI... uris) {
@@ -134,19 +153,19 @@ public class ServiceConfigurationUtils {
         return 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
     }
 
-    public static String brokerUrl(String host, int port) {
-        return String.format("pulsar://%s:%d", host, port);
+    public static String brokerUrl(String ipOrHost, int port) {
+        return String.format("pulsar://%s:%d", formatHost(ipOrHost), port);
     }
 
-    public static String brokerUrlTls(String host, int port) {
-        return String.format("pulsar+ssl://%s:%d", host, port);
+    public static String brokerUrlTls(String ipOrHost, int port) {
+        return String.format("pulsar+ssl://%s:%d", formatHost(ipOrHost), port);
     }
 
-    public static String webServiceUrl(String host, int port) {
-        return String.format("http://%s:%d";, host, port);
+    public static String webServiceUrl(String ipOrHost, int port) {
+        return String.format("http://%s:%d";, formatHost(ipOrHost), port);
     }
 
-    public static String webServiceUrlTls(String host, int port) {
-        return String.format("https://%s:%d";, host, port);
+    public static String webServiceUrlTls(String ipOrHost, int port) {
+        return String.format("https://%s:%d";, formatHost(ipOrHost), port);
     }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
index 41ae9452d2a..1df78984846 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
@@ -38,7 +38,7 @@ import org.apache.pulsar.common.configuration.BindAddress;
  */
 public class BindAddressValidator {
 
-    private static final Pattern BIND_ADDRESSES_PATTERN = 
Pattern.compile("(?<name>\\w+):(?<url>.+)$");
+    private static final Pattern BIND_ADDRESSES_PATTERN = 
Pattern.compile("(?<name>[^:]+):(?<url>.+)$");
 
     /**
      * Validate the configuration of `bindAddresses`.
@@ -64,8 +64,11 @@ public class BindAddressValidator {
         // migrate the legacy port-based configuration to bind addresses 
tagged with the internal listener name
         List<BindAddress> addresses = migrateBindAddresses(config, 
internalListenerName);
 
-        // parse the list of additional bind addresses
+        // parse the list of additional bind addresses; trim whitespace around 
the comma-separated
+        // entries so configurations split across multiple lines or padded for 
readability are accepted
         
Arrays.stream(StringUtils.split(StringUtils.defaultString(config.getBindAddresses()),
 ","))
+                .map(StringUtils::trim)
+                .filter(StringUtils::isNotEmpty)
                 .map(s -> {
                     Matcher m = BIND_ADDRESSES_PATTERN.matcher(s);
                     if (!m.matches()) {
@@ -73,7 +76,11 @@ public class BindAddressValidator {
                     }
                     return m;
                 })
-                .map(m -> new BindAddress(m.group("name"), 
URI.create(m.group("url"))))
+                .map(m -> {
+                    String name = StringUtils.trim(m.group("name"));
+                    MultipleListenerValidator.validateListenerName(name);
+                    return new BindAddress(name, 
URI.create(StringUtils.trim(m.group("url"))));
+                })
                 .forEach(addresses::add);
 
         // apply the filter
@@ -109,7 +116,7 @@ public class BindAddressValidator {
             if (addr.getAddress().getPort() == 0) {
                 continue;
             }
-            String ipPort = addr.getAddress().getHost() + ":" + 
addr.getAddress().getPort();
+            String ipPort = 
MultipleListenerValidator.formatHostPort(addr.getAddress());
             BindAddress prior = uniqueIpPort.putIfAbsent(ipPort, addr);
             if (prior != null) {
                 throw new IllegalArgumentException("bindAddresses: ip:port `" 
+ ipPort
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
index 73fbef9305d..5a0c4179ff2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.validator;
 
+import io.netty.util.NetUtil;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -25,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -35,8 +37,51 @@ import 
org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
  */
 public final class MultipleListenerValidator {
 
+    /** Allowed listener-name characters: ASCII letters, digits, underscore, 
hyphen. */
+    private static final Pattern LISTENER_NAME_PATTERN = 
Pattern.compile("[A-Za-z0-9_-]+");
+
+    /**
+     * Format the host:port part of a URI for use as a uniqueness key and in 
error messages, wrapping
+     * IPv6 literals in brackets so that the colon separator is unambiguous. 
{@link URI#getHost()} may
+     * or may not include the brackets depending on the JDK, so they are 
stripped before the
+     * {@link NetUtil#isValidIpV6Address} check.
+     */
+    static String formatHostPort(URI uri) {
+        String host = uri.getHost();
+        if (host == null) {
+            return host + ":" + uri.getPort();
+        }
+        String unbracketed = host.startsWith("[") && host.endsWith("]")
+                ? host.substring(1, host.length() - 1) : host;
+        if (NetUtil.isValidIpV6Address(unbracketed)) {
+            return "[" + unbracketed + "]:" + uri.getPort();
+        }
+        return host + ":" + uri.getPort();
+    }
+
     /**
-     * Validate the configuration of `advertisedListeners` and 
`internalListenerName`.
+     * Validate a listener name. Listener names must be non-blank and contain 
only ASCII letters,
+     * digits, underscore, and hyphen so they are safe to embed in URLs 
without encoding.
+     *
+     * @throws IllegalArgumentException if the name is null, blank, or 
contains disallowed characters.
+     */
+    public static void validateListenerName(String name) {
+        if (StringUtils.isBlank(name)) {
+            throw new IllegalArgumentException("listener name must not be 
blank");
+        }
+        if (!LISTENER_NAME_PATTERN.matcher(name).matches()) {
+            throw new IllegalArgumentException("listener name `" + name + "` 
must contain only ASCII"
+                    + " letters, digits, underscore, or hyphen");
+        }
+    }
+
+    /**
+     * Validate `advertisedListeners` and `internalListenerName`, returning 
the parsed listener map.
+     * <p>
+     * This method mutates the supplied {@link ServiceConfiguration}: when 
{@code internalListenerName}
+     * is blank, it is written back with the resolved fallback value (the 
first parsed listener if any,
+     * otherwise {@value ServiceConfiguration#DEFAULT_INTERNAL_LISTENER_NAME}) 
so that subsequent reads
+     * from the config see the effective value.
      * <ol>
      * <li>`advertisedListeners` is a comma-separated list of endpoints in the 
form
      *     `listener:scheme://host:port`. Supported schemes are `pulsar`, 
`pulsar+ssl`, `http`, and `https`.
@@ -51,16 +96,18 @@ public final class MultipleListenerValidator {
      *     internal listener to override individual URLs (e.g. a custom TLS 
hostname) while still
      *     benefiting from auto-population for the rest.
      * </ol>
-     * @param config the pulsar broker configuration.
+     * @param config the pulsar broker configuration; updated in place when
+     *               {@code internalListenerName} is blank.
      * @return the parsed and validated advertised listeners, keyed by 
listener name.
      */
-    public static Map<String, AdvertisedListener> 
validateAndAnalysisAdvertisedListener(ServiceConfiguration config) {
+    public static Map<String, AdvertisedListener> 
validateAndUpdateAdvertisedListeners(ServiceConfiguration config) {
         Map<String, AdvertisedListener> result = 
parseAdvertisedListeners(config);
 
-        // Resolve `internalListenerName` if the user left it blank. For 
backward compatibility,
-        // prefer the first entry parsed from `advertisedListeners`; otherwise 
fall back to the
-        // constant default ("internal"). The field's own default is also 
"internal", so this
-        // branch is only taken when the user explicitly set the property to 
an empty string.
+        // Resolve `internalListenerName` if the user left it blank (null, 
empty, or whitespace).
+        // For backward compatibility, prefer the first entry parsed from 
`advertisedListeners`;
+        // otherwise fall back to the constant default ("internal"). The 
field's own default is
+        // also "internal", so this branch is normally taken only when the 
user explicitly set the
+        // property to an empty string.
         if (StringUtils.isBlank(config.getInternalListenerName())) {
             String fallback = result.isEmpty()
                     ? ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME
@@ -117,13 +164,20 @@ public final class MultipleListenerValidator {
             return new LinkedHashMap<>();
         }
         Map<String, List<String>> listeners = new LinkedHashMap<>();
-        for (final String str : 
StringUtils.split(config.getAdvertisedListeners(), ",")) {
+        // Trim whitespace around comma-separated entries so configurations 
split across multiple
+        // lines or padded for readability are accepted, and skip empties.
+        for (final String raw : 
StringUtils.split(config.getAdvertisedListeners(), ",")) {
+            String str = StringUtils.trim(raw);
+            if (StringUtils.isEmpty(str)) {
+                continue;
+            }
             int index = str.indexOf(":");
             if (index <= 0) {
                 throw new IllegalArgumentException("the configure entry 
`advertisedListeners` is invalid. because "
                         + str + " do not contain listener name");
             }
             String listenerName = StringUtils.trim(str.substring(0, index));
+            validateListenerName(listenerName);
             String value = StringUtils.trim(str.substring(index + 1));
             listeners.computeIfAbsent(listenerName, k -> new ArrayList<>(2));
             listeners.get(listenerName).add(value);
@@ -169,7 +223,7 @@ public final class MultipleListenerValidator {
                         }
                     }
 
-                    String hostPort = String.format("%s:%d", uri.getHost(), 
uri.getPort());
+                    String hostPort = formatHostPort(uri);
                     Set<String> sets = 
reverseMappings.computeIfAbsent(hostPort, k -> new TreeSet<>());
                     sets.add(entry.getKey());
                     if (sets.size() > 1) {
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/ServiceConfigurationUtilsTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/ServiceConfigurationUtilsTest.java
new file mode 100644
index 00000000000..1dccb5ab77a
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/ServiceConfigurationUtilsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.testng.Assert.assertEquals;
+import java.net.URI;
+import org.testng.annotations.Test;
+
+public class ServiceConfigurationUtilsTest {
+
+    @Test
+    public void testBrokerUrlWithHostname() {
+        
assertEquals(ServiceConfigurationUtils.brokerUrl("broker-1.example.com", 6650),
+                "pulsar://broker-1.example.com:6650");
+    }
+
+    @Test
+    public void testBrokerUrlWithIpv4() {
+        assertEquals(ServiceConfigurationUtils.brokerUrl("10.0.0.1", 6650), 
"pulsar://10.0.0.1:6650");
+    }
+
+    @Test
+    public void testBrokerUrlWithIpv6Bare() {
+        // A bare IPv6 literal must be wrapped in brackets so the resulting 
URL parses correctly.
+        String url = ServiceConfigurationUtils.brokerUrl("::1", 6650);
+        assertEquals(url, "pulsar://[::1]:6650");
+        URI parsed = URI.create(url);
+        assertEquals(parsed.getPort(), 6650);
+    }
+
+    @Test
+    public void testBrokerUrlTlsWithIpv6Bare() {
+        String url = ServiceConfigurationUtils.brokerUrlTls("fe80::1", 6651);
+        assertEquals(url, "pulsar+ssl://[fe80::1]:6651");
+        assertEquals(URI.create(url).getPort(), 6651);
+    }
+
+    @Test
+    public void testWebServiceUrlWithIpv6Bare() {
+        String url = ServiceConfigurationUtils.webServiceUrl("2001:db8::1", 
8080);
+        assertEquals(url, "http://[2001:db8::1]:8080";);
+        assertEquals(URI.create(url).getPort(), 8080);
+    }
+
+    @Test
+    public void testWebServiceUrlTlsWithIpv6Bare() {
+        String url = ServiceConfigurationUtils.webServiceUrlTls("2001:db8::1", 
8443);
+        assertEquals(url, "https://[2001:db8::1]:8443";);
+        assertEquals(URI.create(url).getPort(), 8443);
+    }
+
+    @Test
+    public void testIpv6AlreadyBracketedIsNotDoubleWrapped() {
+        // If the caller passed a pre-bracketed IPv6 literal, leave it as-is 
rather than producing
+        // `pulsar://[[::1]]:6650`.
+        String url = ServiceConfigurationUtils.brokerUrl("[::1]", 6650);
+        assertEquals(url, "pulsar://[::1]:6650");
+        assertEquals(URI.create(url).getPort(), 6650);
+    }
+
+    @Test
+    public void testIpv4InUrlIsNotBracketed() {
+        // IPv4 addresses must not be bracketed.
+        String url = ServiceConfigurationUtils.webServiceUrl("0.0.0.0", 8080);
+        assertEquals(url, "http://0.0.0.0:8080";);
+    }
+}
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
index 3495cb1b74c..3aa79f7fa40 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
@@ -234,4 +234,27 @@ public class BindAddressValidatorTest {
         assertTrue(e.getMessage().contains("0.0.0.0:8080"),
                 "expected ip:port in message but got: " + e.getMessage());
     }
+
+    @Test
+    public void testWhitespaceBetweenEntriesIsTrimmed() {
+        ServiceConfiguration config = newEmptyConfiguration();
+        // Spaces, newlines, and tabs around the comma separators and around 
the entries themselves
+        // must all be tolerated and trimmed.
+        config.setBindAddresses("  internal:pulsar://0.0.0.0:6650 
,\n\texternal:pulsar+ssl://0.0.0.0:6651  ");
+        List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("external", 
URI.create("pulsar+ssl://0.0.0.0:6651"))));
+    }
+
+    @Test
+    public void testEmptyEntriesAreSkipped() {
+        ServiceConfiguration config = newEmptyConfiguration();
+        // A trailing comma or an extra comma in the middle should be ignored, 
not flagged as malformed.
+        config.setBindAddresses("internal:pulsar://0.0.0.0:6650, 
,external:pulsar+ssl://0.0.0.0:6651,");
+        List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("external", 
URI.create("pulsar+ssl://0.0.0.0:6651"))));
+    }
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
index 98fbe6cc1e8..d66c917f9b4 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
@@ -71,7 +71,7 @@ public class MultipleListenerValidatorTest {
         ServiceConfiguration config = newConfigWithNoPorts();
         config.setInternalListenerName(null);
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, 
internal:pulsar+ssl://127.0.0.1:6651");
-        
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         assertEquals("internal", config.getInternalListenerName());
     }
 
@@ -85,7 +85,7 @@ public class MultipleListenerValidatorTest {
         config.setAdvertisedListeners(
                 
"region1:pulsar://region1.example.com:6650,region2:pulsar://region2.example.com:6650");
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         // `region1` is the first declared, so it becomes the internal 
listener.
         assertEquals(config.getInternalListenerName(), "region1");
         // Both region entries are preserved in the result map.
@@ -103,7 +103,7 @@ public class MultipleListenerValidatorTest {
         config.setBrokerServicePort(Optional.of(6650));
         config.setWebServicePort(Optional.of(8080));
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         assertEquals(config.getInternalListenerName(), 
ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
         
assertNotNull(listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME),
                 "the legacy-port-synthesized internal listener should be 
present");
@@ -113,7 +113,7 @@ public class MultipleListenerValidatorTest {
     public void testMalformedListener() {
         ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(":pulsar://127.0.0.1:6660");
-        
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -122,7 +122,7 @@ public class MultipleListenerValidatorTest {
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, 
internal:pulsar+ssl://127.0.0.1:6651,"
                 + " internal:pulsar://192.168.1.11:6660, 
internal:pulsar+ssl://192.168.1.11:6651");
         config.setInternalListenerName("internal");
-        
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -130,7 +130,7 @@ public class MultipleListenerValidatorTest {
         ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " 
internal:pulsar://192.168.1.11:6660");
         config.setInternalListenerName("internal");
-        
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -139,7 +139,7 @@ public class MultipleListenerValidatorTest {
         config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6661,"
                 + " internal:pulsar+ssl://192.168.1.11:6661");
         config.setInternalListenerName("internal");
-        
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -147,7 +147,7 @@ public class MultipleListenerValidatorTest {
         ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " 
external:pulsar://127.0.0.1:6660");
         config.setInternalListenerName("internal");
-        
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
     }
 
     @Test
@@ -158,7 +158,7 @@ public class MultipleListenerValidatorTest {
         config.setBrokerServicePort(Optional.of(6650));
         config.setWebServicePort(Optional.of(8080));
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         AdvertisedListener internal = 
listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
         assertNotNull(internal, "expected an `internal` listener to be 
synthesized from legacy ports");
         assertEquals(internal.getBrokerServiceUrl(), 
URI.create("pulsar://broker-1.example.com:6650"));
@@ -176,7 +176,7 @@ public class MultipleListenerValidatorTest {
         config.setWebServicePort(Optional.of(8080));
         config.setWebServicePortTls(Optional.of(8443));
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         AdvertisedListener internal = 
listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
         assertNotNull(internal);
         assertEquals(internal.getBrokerServiceUrl(), 
URI.create("pulsar://broker-1.example.com:6650"));
@@ -196,7 +196,7 @@ public class MultipleListenerValidatorTest {
         
config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660,"
                 + "region1:pulsar+ssl://region1.example.com:6661");
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         assertEquals(listeners.size(), 2);
         AdvertisedListener region1 = listeners.get("region1");
         assertNotNull(region1);
@@ -219,7 +219,7 @@ public class MultipleListenerValidatorTest {
         
config.setAdvertisedListeners("internal:pulsar://explicit.example.com:6660,"
                 + "internal:http://explicit.example.com:8888";);
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         assertEquals(listeners.size(), 1);
         AdvertisedListener internal = listeners.get("internal");
         assertNotNull(internal);
@@ -243,7 +243,7 @@ public class MultipleListenerValidatorTest {
         config.setWebServicePortTls(Optional.of(8443));
         
config.setAdvertisedListeners("internal:pulsar://override.example.com:6660");
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         AdvertisedListener internal = listeners.get("internal");
         assertNotNull(internal);
         // Overridden by the explicit advertisedListeners entry:
@@ -265,7 +265,7 @@ public class MultipleListenerValidatorTest {
         
config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660,"
                 + "region1:http://region1.example.com:8888";);
         Map<String, AdvertisedListener> listeners =
-                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
         assertEquals(listeners.size(), 1);
         AdvertisedListener region1 = listeners.get("region1");
         assertNotNull(region1);
@@ -280,7 +280,7 @@ public class MultipleListenerValidatorTest {
         config.setInternalListenerName("region1");
         
config.setAdvertisedListeners("region2:pulsar://region2.example.com:6660");
         IllegalArgumentException e = 
expectThrows(IllegalArgumentException.class,
-                () -> 
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config));
+                () -> 
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config));
         assertTrue(e.getMessage().contains("region1"), "expected error to 
mention the listener name: "
                 + e.getMessage());
     }
@@ -291,6 +291,36 @@ public class MultipleListenerValidatorTest {
         assertEquals(config.getInternalListenerName(), 
ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
     }
 
+    @Test
+    public void testWhitespaceBetweenEntriesIsTrimmed() {
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setInternalListenerName("internal");
+        // Spaces, newlines, and tabs around the comma separators and around 
the entries themselves
+        // must all be tolerated and trimmed.
+        config.setAdvertisedListeners(
+                "  internal:pulsar://broker-1:6650 
,\n\texternal:pulsar://broker-1.public:6650  ");
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
+        assertEquals(listeners.size(), 2);
+        assertEquals(listeners.get("internal").getBrokerServiceUrl(), 
URI.create("pulsar://broker-1:6650"));
+        assertEquals(listeners.get("external").getBrokerServiceUrl(),
+                URI.create("pulsar://broker-1.public:6650"));
+    }
+
+    @Test
+    public void testEmptyEntriesAreSkipped() {
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setInternalListenerName("internal");
+        // A trailing comma or an extra comma in the middle should be ignored.
+        config.setAdvertisedListeners("internal:pulsar://broker-1:6650, 
,external:pulsar://broker-1.public:6650,");
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
+        assertEquals(listeners.size(), 2);
+        assertEquals(listeners.get("internal").getBrokerServiceUrl(), 
URI.create("pulsar://broker-1:6650"));
+        assertEquals(listeners.get("external").getBrokerServiceUrl(),
+                URI.create("pulsar://broker-1.public:6650"));
+    }
+
     private ServiceConfiguration newConfigWithNoPorts() {
         ServiceConfiguration config = new ServiceConfiguration();
         config.setBrokerServicePort(Optional.empty());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f85d12116ef..6a46fb3760d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -368,7 +368,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         this.openTelemetry = new PulsarBrokerOpenTelemetry(config, 
openTelemetrySdkBuilderCustomizer);
 
         // validate `advertisedAddress`, `advertisedListeners`, 
`internalListenerName`
-        this.advertisedListeners = 
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        this.advertisedListeners = 
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
 
         // the advertised address is defined as the host component of the 
broker's canonical name.
         this.advertisedAddress = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
@@ -976,12 +976,12 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             // necessary both for dynamic ports (`Optional.of(0)`) and for the 
case where the broker
             // is configured only via `bindAddresses` (legacy port properties 
left as
             // `Optional.empty()`), so that downstream code — in particular
-            // MultipleListenerValidator.validateAndAnalysisAdvertisedListener 
— can synthesize the
+            // MultipleListenerValidator.validateAndUpdateAdvertisedListeners 
— can synthesize the
             // internal advertised listener from the now-known ports.
             brokerService.getListenPort().ifPresent(port -> 
config.setBrokerServicePort(Optional.of(port)));
             brokerService.getListenPortTls().ifPresent(port -> 
config.setBrokerServicePortTls(Optional.of(port)));
             // Recompute the cached advertised listener map now that the bound 
ports are known.
-            this.advertisedListeners = 
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+            this.advertisedListeners = 
MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config);
             this.webServiceAddress = webAddress(config);
             this.webServiceAddressTls = webAddressTls(config);
             this.brokerServiceUrl = brokerUrl(config);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java
index e84a6aecb95..13e591f87cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java
@@ -89,8 +89,8 @@ public class RedirectManagerForLoadManagerMigration {
      * @param options lookup options
      * @return lookup result
      */
-    public CompletableFuture<Optional<LookupResult>> 
redirectIfLoadBalancerOnBrokerIsNotExpected
-            (LookupOptions options) {
+    public CompletableFuture<Optional<LookupResult>> 
redirectIfLoadBalancerOnBrokerIsNotExpected(
+            LookupOptions options) {
         if (!pulsar.getConfiguration().isLoadManagerMigrationEnabled()) {
             // no-op when load manager migration is disabled.
             return CompletableFuture.completedFuture(Optional.empty());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
index c45614a8be3..f27cf763a96 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
@@ -22,6 +22,8 @@ import static 
org.apache.pulsar.broker.lookup.v2.TopicLookup.LISTENERNAME_PARAM;
 import java.net.URI;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
@@ -125,56 +127,27 @@ public class LookupResult {
                                                String webServiceUrlTls, String 
pulsarServiceUrl,
                                                String pulsarServiceUrlTls,
                                                Map<String, AdvertisedListener> 
advertisedListeners) {
-        String httpUrl = webServiceUrl;
-        String httpUrlTls = webServiceUrlTls;
-        String brokerServiceUrl = pulsarServiceUrl;
-        String brokerServiceUrlTls = pulsarServiceUrlTls;
-        String brokerServiceListenerName = null;
-        String webServiceListenerName = null;
-
-        if (options != null && options.hasAdvertisedListenerName()) {
-            String advertisedListenerName = 
options.getAdvertisedListenerName();
-            var advertisedListener = 
advertisedListeners.get(advertisedListenerName);
-            if (advertisedListener != null) {
-                brokerServiceListenerName = advertisedListenerName;
-                brokerServiceUrl = null;
-                brokerServiceUrlTls = null;
-                if (advertisedListener.getBrokerServiceUrl() != null) {
-                    brokerServiceUrl = 
advertisedListener.getBrokerServiceUrl().toString();
-                }
-                if (advertisedListener.getBrokerServiceUrlTls() != null) {
-                    brokerServiceUrlTls = 
advertisedListener.getBrokerServiceUrlTls().toString();
-                }
-            }
+        UrlOverride urls = new UrlOverride(webServiceUrl, webServiceUrlTls, 
pulsarServiceUrl, pulsarServiceUrlTls);
+
+        AdvertisedListener brokerListener = lookupListener(options, 
advertisedListeners,
+                LookupOptions::hasAdvertisedListenerName, 
LookupOptions::getAdvertisedListenerName);
+        if (brokerListener != null) {
+            urls.brokerServiceListenerName = 
options.getAdvertisedListenerName();
+            urls.applyBrokerServiceOverride(brokerListener);
         }
 
-        if (options != null && options.hasWebServiceAdvertisedListenerName()) {
-            String advertisedListenerName = 
options.getWebServiceAdvertisedListenerName();
-            var advertisedListener = 
advertisedListeners.get(advertisedListenerName);
-            if (advertisedListener != null) {
-                webServiceListenerName = advertisedListenerName;
-                httpUrl = null;
-                httpUrlTls = null;
-                if (advertisedListener.getBrokerHttpUrl() != null) {
-                    httpUrl = advertisedListener.getBrokerHttpUrl().toString();
-                }
-                if (advertisedListener.getBrokerHttpsUrl() != null) {
-                    httpUrlTls = 
advertisedListener.getBrokerHttpsUrl().toString();
-                }
-                // default to use the webServiceAdvertisedListenerName as the 
brokerServiceListenerName if there
-                // is a brokerServiceUrl or brokerServiceUrlTls configured for 
the webServiceAdvertisedListenerName
-                if (brokerServiceListenerName == null && 
(advertisedListener.getBrokerServiceUrl() != null
-                        || advertisedListener.getBrokerServiceUrlTls() != 
null)) {
-                    brokerServiceListenerName = advertisedListenerName;
-                    brokerServiceUrl = null;
-                    brokerServiceUrlTls = null;
-                    if (advertisedListener.getBrokerServiceUrl() != null) {
-                        brokerServiceUrl = 
advertisedListener.getBrokerServiceUrl().toString();
-                    }
-                    if (advertisedListener.getBrokerServiceUrlTls() != null) {
-                        brokerServiceUrlTls = 
advertisedListener.getBrokerServiceUrlTls().toString();
-                    }
-                }
+        AdvertisedListener webListener = lookupListener(options, 
advertisedListeners,
+                LookupOptions::hasWebServiceAdvertisedListenerName,
+                LookupOptions::getWebServiceAdvertisedListenerName);
+        if (webListener != null) {
+            urls.webServiceListenerName = 
options.getWebServiceAdvertisedListenerName();
+            urls.applyWebServiceOverride(webListener);
+            // default the brokerServiceListenerName to the 
webServiceAdvertisedListenerName if the
+            // listener also configures broker service URLs and no separate 
advertisedListenerName was given
+            if (urls.brokerServiceListenerName == null
+                    && (webListener.getBrokerServiceUrl() != null || 
webListener.getBrokerServiceUrlTls() != null)) {
+                urls.brokerServiceListenerName = 
options.getWebServiceAdvertisedListenerName();
+                urls.applyBrokerServiceOverride(webListener);
             }
         }
 
@@ -192,16 +165,55 @@ public class LookupResult {
         return builder()
                 .type(type)
                 .brokerId(brokerId)
-                .httpUrl(httpUrl)
-                .httpUrlTls(httpUrlTls)
-                .brokerServiceUrl(brokerServiceUrl)
-                .brokerServiceUrlTls(brokerServiceUrlTls)
+                .httpUrl(urls.httpUrl)
+                .httpUrlTls(urls.httpUrlTls)
+                .brokerServiceUrl(urls.brokerServiceUrl)
+                .brokerServiceUrlTls(urls.brokerServiceUrlTls)
                 .authoritativeRedirect(authoritativeRedirect)
-                .brokerServiceListenerName(brokerServiceListenerName)
-                .webServiceListenerName(webServiceListenerName)
+                .brokerServiceListenerName(urls.brokerServiceListenerName)
+                .webServiceListenerName(urls.webServiceListenerName)
                 .build();
     }
 
+    private static AdvertisedListener lookupListener(LookupOptions options,
+                                                     Map<String, 
AdvertisedListener> advertisedListeners,
+                                                     Predicate<LookupOptions> 
hasName,
+                                                     Function<LookupOptions, 
String> getName) {
+        if (options == null || !hasName.test(options)) {
+            return null;
+        }
+        return advertisedListeners.get(getName.apply(options));
+    }
+
+    /** Mutable URL-set used while resolving listener-specific overrides for a 
LookupResult. */
+    private static final class UrlOverride {
+        String httpUrl;
+        String httpUrlTls;
+        String brokerServiceUrl;
+        String brokerServiceUrlTls;
+        String brokerServiceListenerName;
+        String webServiceListenerName;
+
+        UrlOverride(String httpUrl, String httpUrlTls, String 
brokerServiceUrl, String brokerServiceUrlTls) {
+            this.httpUrl = httpUrl;
+            this.httpUrlTls = httpUrlTls;
+            this.brokerServiceUrl = brokerServiceUrl;
+            this.brokerServiceUrlTls = brokerServiceUrlTls;
+        }
+
+        void applyBrokerServiceOverride(AdvertisedListener listener) {
+            brokerServiceUrl = listener.getBrokerServiceUrl() != null
+                    ? listener.getBrokerServiceUrl().toString() : null;
+            brokerServiceUrlTls = listener.getBrokerServiceUrlTls() != null
+                    ? listener.getBrokerServiceUrlTls().toString() : null;
+        }
+
+        void applyWebServiceOverride(AdvertisedListener listener) {
+            httpUrl = listener.getBrokerHttpUrl() != null ? 
listener.getBrokerHttpUrl().toString() : null;
+            httpUrlTls = listener.getBrokerHttpsUrl() != null ? 
listener.getBrokerHttpsUrl().toString() : null;
+        }
+    }
+
     public boolean isBrokerUrl() {
         return type == Type.BrokerUrl;
     }
@@ -233,7 +245,7 @@ public class LookupResult {
      * @return the redirect URI
      */
     public URI toRedirectUri(URI requestUri) {
-        return toRedirectUriInternal(requestUri, this.authoritativeRedirect);
+        return toRedirectUriInternal(requestUri, this.authoritativeRedirect, 
false);
     }
 
     /**
@@ -242,18 +254,40 @@ public class LookupResult {
      * redirect as authoritative regardless of what the lookup result carried).
      */
     public URI toRedirectUri(URI requestUri, boolean 
authoritativeRedirectOverride) {
-        return toRedirectUriInternal(requestUri, 
authoritativeRedirectOverride);
+        return toRedirectUriInternal(requestUri, 
authoritativeRedirectOverride, false);
     }
 
-    private URI toRedirectUriInternal(URI requestUri, boolean 
authoritativeRedirect) {
+    /**
+     * Same as {@link #toRedirectUri(URI)} but specialised for topic-lookup 
redirects: when this
+     * {@code LookupResult} has a resolved {@code brokerServiceListenerName} 
it is always written to
+     * the {@code listenerName} query parameter on the redirect URI. This 
handles the case where the
+     * original lookup request carried the listener name in a header rather 
than as a query
+     * parameter — the header does not survive an HTTP redirect, so the 
parameter form must be
+     * propagated to the next broker. Other (non-lookup) redirect paths use 
{@link #toRedirectUri}
+     * and leave the parameter alone.
+     */
+    public URI toLookupRedirectUri(URI requestUri) {
+        return toRedirectUriInternal(requestUri, this.authoritativeRedirect, 
true);
+    }
+
+    private URI toRedirectUriInternal(URI requestUri, boolean 
authoritativeRedirect,
+                                      boolean injectListenerNameQueryParam) {
         boolean requireHttps = 
"https".equalsIgnoreCase(requestUri.getScheme());
         String webServiceUrl = requireHttps ? lookupData.getHttpUrlTls() : 
lookupData.getHttpUrl();
         if (webServiceUrl == null) {
             // Preserve the legacy 412 error semantics when the redirect 
target broker has no URL
             // configured for the requested scheme.
+            String scheme = requireHttps ? "https" : "http";
+            StringBuilder entity = new StringBuilder()
+                    .append("No ").append(scheme).append(" URL configured for 
broker ")
+                    .append(lookupData.getBrokerId());
+            if (StringUtils.isNotBlank(webServiceListenerName)) {
+                entity.append(" on web service listener 
`").append(webServiceListenerName).append("`");
+            } else if (StringUtils.isNotBlank(brokerServiceListenerName)) {
+                entity.append(" on listener 
`").append(brokerServiceListenerName).append("`");
+            }
             throw new 
WebApplicationException(Response.status(Response.Status.PRECONDITION_FAILED)
-                    .entity("No " + (requireHttps ? "https" : "http")
-                            + " URL configured for broker " + 
lookupData.getBrokerId())
+                    .entity(entity.toString())
                     .build());
         }
         URI webServiceUri = URI.create(webServiceUrl);
@@ -269,9 +303,12 @@ public class LookupResult {
             // remove the parameter when the type is not redirect
             uriBuilder.replaceQueryParam("authoritative");
         }
-        // override the listener parameter only when the lookup result 
specifies one;
-        // otherwise leave the original request's listenerName query parameter 
untouched
-        if (StringUtils.isNotBlank(brokerServiceListenerName)) {
+        // Only set the listenerName query parameter on topic-lookup 
redirects. The original lookup
+        // request can carry it either as a query parameter or as a header; 
the latter does not
+        // survive an HTTP redirect, so the resolved listener name must be 
reinjected as a query
+        // parameter so the next broker sees it. Other redirect paths (admin 
endpoints) do not
+        // understand `listenerName` as a parameter, so for those we leave the 
request URI alone.
+        if (injectListenerNameQueryParam && 
StringUtils.isNotBlank(brokerServiceListenerName)) {
             uriBuilder.replaceQueryParam(LISTENERNAME_PARAM, 
brokerServiceListenerName);
         }
         return uriBuilder.build();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 72bf6d14c31..dd08fc10e91 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -108,7 +108,7 @@ public class TopicLookupBase extends PulsarWebResource {
                         LookupResult lookupResult = optionalResult.get();
 
                         if (lookupResult.isLoadManagerMigration()) {
-                            URI redirectUri = 
lookupResult.toRedirectUri(uri.getRequestUri());
+                            URI redirectUri = 
lookupResult.toLookupRedirectUri(uri.getRequestUri());
                             log.debug().log("Redirecting to a broker with the 
expected load manager.");
                             throw new 
WebApplicationException(Response.temporaryRedirect(redirectUri).build());
                         }
@@ -121,7 +121,7 @@ public class TopicLookupBase extends PulsarWebResource {
                                 throw new UnsupportedOperationException(
                                         "This implementation expects that the 
requestUri is a topic lookup.");
                             }
-                            URI redirectUri = 
lookupResult.toRedirectUri(requestUri);
+                            URI redirectUri = 
lookupResult.toLookupRedirectUri(requestUri);
                             log.debug()
                                     .attr("topic", topicName)
                                     .attr("redirectUri", redirectUri)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index db6ce90c203..fb7c3d68bf6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -263,8 +263,8 @@ public class NamespaceService implements AutoCloseable {
         return future;
     }
 
-    private CompletableFuture<Optional<LookupResult>> 
redirectIfLoadBalancerOnBrokerIsNotExpected
-            (ServiceUnitId bundle, LookupOptions options) {
+    private CompletableFuture<Optional<LookupResult>> 
redirectIfLoadBalancerOnBrokerIsNotExpected(
+            ServiceUnitId bundle, LookupOptions options) {
         if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) 
{
             return CompletableFuture.completedFuture(Optional.empty());
         }
@@ -529,8 +529,9 @@ public class NamespaceService implements AutoCloseable {
         return CompletableFuture.completedFuture(null);
     }
 
-    private static void resolveBrokerServiceLookupResult(LookupOptions 
options, NamespaceEphemeralData nsData,
-                                                         
CompletableFuture<Optional<LookupResult>> future) {
+    // package-private for tests
+    static void resolveBrokerServiceLookupResult(LookupOptions options, 
NamespaceEphemeralData nsData,
+                                                 
CompletableFuture<Optional<LookupResult>> future) {
         LookupResult result = LookupResult.create(nsData, options);
 
         // fail the lookup if advertised listener name is provided and does 
not match
@@ -542,12 +543,16 @@ public class NamespaceService implements AutoCloseable {
             return;
         }
 
+        // Tolerate a missing web service listener on the target broker — 
during a rolling cluster
+        // upgrade, older brokers may not have published the listener yet. 
Fall back to the default
+        // web service URL (the broker's primary HTTP/HTTPS) in that case; 
toRedirectUri picks it up
+        // because httpUrl/httpUrlTls were left at the broker's defaults by 
LookupResult.create.
         if (options.hasWebServiceAdvertisedListenerName()
                 && !Objects.equals(result.getWebServiceListenerName(), 
options.getWebServiceAdvertisedListenerName())) {
             log.warn()
                     .attr("brokerId", result.getLookupData().getBrokerId())
                     .attr("webServiceListenerName", 
options.getWebServiceAdvertisedListenerName())
-                    .log("The broker doesn't have the listener configured.");
+                    .log("Target broker has no matching web service listener; 
redirecting to its default URL.");
         }
 
         future.complete(Optional.of(result));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
index 917bd1a3128..0571fa6915e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java
@@ -442,6 +442,67 @@ public class LookupResultTest {
         assertTrue(query.contains("other=1"));
     }
 
+    @Test
+    public void testToLookupRedirectUriInjectsListenerNameWhenResolved() {
+        // Topic-lookup case: the original request may carry the listener name 
in a header that the
+        // redirect does not preserve. toLookupRedirectUri must inject it as a 
query parameter so the
+        // next broker sees it.
+        LookupResult result = LookupResult.builder()
+                .type(LookupResult.Type.RedirectUrl)
+                .httpUrl(WEB_URL)
+                .brokerServiceListenerName("external")
+                .build();
+
+        URI request = 
URI.create("http://original-host:1234/lookup/v2/topic/persistent/p/d/t";);
+
+        URI redirect = result.toLookupRedirectUri(request);
+
+        String query = redirect.getQuery();
+        assertNotNull(query);
+        assertTrue(query.contains("listenerName=external"), "query=" + query);
+    }
+
+    @Test
+    public void testToRedirectUriDoesNotInjectListenerNameForAdminPaths() {
+        // Admin redirects do not understand the listenerName query parameter 
— even if the
+        // LookupResult has a resolved brokerServiceListenerName, 
toRedirectUri must leave the
+        // request's query string alone.
+        LookupResult result = LookupResult.builder()
+                .type(LookupResult.Type.RedirectUrl)
+                .httpUrl(WEB_URL)
+                .brokerServiceListenerName("external")
+                .build();
+
+        URI request = 
URI.create("http://original-host:1234/admin/v2/persistent/p/d/t";);
+
+        URI redirect = result.toRedirectUri(request);
+
+        String query = redirect.getQuery();
+        // no listenerName query param added
+        if (query != null) {
+            assertFalse(query.contains("listenerName"), "query=" + query);
+        }
+    }
+
+    @Test
+    public void testToRedirectUriPreservesExistingListenerNameForAdminPaths() {
+        // If the original admin request already carries listenerName in the 
query string, leave it
+        // there rather than dropping it.
+        LookupResult result = LookupResult.builder()
+                .type(LookupResult.Type.RedirectUrl)
+                .httpUrl(WEB_URL)
+                .brokerServiceListenerName("external")
+                .build();
+
+        URI request = 
URI.create("http://original-host:1234/admin?listenerName=fromClient";);
+
+        URI redirect = result.toRedirectUri(request);
+
+        String query = redirect.getQuery();
+        assertNotNull(query);
+        assertTrue(query.contains("listenerName=fromClient"), "query=" + 
query);
+    }
+
     @Test
     public void testToStringIncludesAllFields() {
         LookupResult result = LookupResult.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceListenerResolutionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceListenerResolutionTest.java
new file mode 100644
index 00000000000..708a6c0cadf
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceListenerResolutionTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link NamespaceService#resolveBrokerServiceLookupResult}.
+ */
+@Test(groups = "broker")
+public class NamespaceServiceListenerResolutionTest {
+
+    @Test
+    public void testFailsOnBrokerServiceListenerMismatch() {
+        NamespaceEphemeralData nsData = new 
NamespaceEphemeralData("broker-1:8080",
+                "pulsar://broker-1:6650", null, "http://broker-1:8080";, null, 
false,
+                new HashMap<>());
+        LookupOptions options = 
LookupOptions.builder().advertisedListenerName("missing").build();
+        CompletableFuture<Optional<LookupResult>> future = new 
CompletableFuture<>();
+        NamespaceService.resolveBrokerServiceLookupResult(options, nsData, 
future);
+        assertTrue(future.isCompletedExceptionally());
+        Throwable cause = expectThrows(Exception.class, 
future::get).getCause();
+        assertTrue(cause instanceof PulsarServerException, "unexpected cause: 
" + cause);
+        assertTrue(cause.getMessage().contains("'missing' listener"), 
"unexpected message: " + cause.getMessage());
+    }
+
+    @Test
+    public void testFallsBackOnWebServiceListenerMismatch() throws Exception {
+        // During rolling upgrades, the target broker may not publish the 
listener yet. The lookup
+        // must not fail; instead, the redirect target falls back to the 
broker's default URLs.
+        NamespaceEphemeralData nsData = new 
NamespaceEphemeralData("broker-1:8080",
+                "pulsar://broker-1:6650", null, "http://broker-1:8080";, null, 
false,
+                new HashMap<>());
+        LookupOptions options = 
LookupOptions.builder().webServiceAdvertisedListenerName("missing").build();
+        CompletableFuture<Optional<LookupResult>> future = new 
CompletableFuture<>();
+        NamespaceService.resolveBrokerServiceLookupResult(options, nsData, 
future);
+        LookupResult result = future.get().orElseThrow();
+        assertNull(result.getWebServiceListenerName());
+        assertEquals(result.getLookupData().getHttpUrl(), 
"http://broker-1:8080";);
+    }
+
+    @Test
+    public void testSucceedsWhenListenersMatch() throws Exception {
+        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+        advertisedListeners.put("listener-a", AdvertisedListener.builder()
+                .brokerServiceUrl(new URI("pulsar://gateway:6650"))
+                .brokerHttpUrl(new URI("http://gateway:8080";))
+                .build());
+        NamespaceEphemeralData nsData = new 
NamespaceEphemeralData("broker-1:8080",
+                "pulsar://broker-1:6650", null, "http://broker-1:8080";, null, 
false,
+                advertisedListeners);
+        LookupOptions options = LookupOptions.builder()
+                .advertisedListenerName("listener-a")
+                .webServiceAdvertisedListenerName("listener-a")
+                .build();
+        CompletableFuture<Optional<LookupResult>> future = new 
CompletableFuture<>();
+        NamespaceService.resolveBrokerServiceLookupResult(options, nsData, 
future);
+        LookupResult result = future.get().orElseThrow();
+        assertEquals(result.getBrokerServiceListenerName(), "listener-a");
+        assertEquals(result.getWebServiceListenerName(), "listener-a");
+    }
+}

Reply via email to