This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch 
lh-support-webservice-bindaddresses-and-advertised-listeners
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ae50a871d7b86f76002de685622b647cfc969b3a
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 20 14:52:46 2026 +0300

    [fix][broker] Fix CI failures in webservice 
bindAddresses/advertisedListeners PR
    
    Address five failing unit tests from the previous CI run:
    
    - PulsarService.start() rejected configurations without webServicePort/
      webServicePortTls even when http/https bindAddresses were configured;
      relax the precondition to also accept http/https bindAddresses
      (fixes PulsarMultiListenersWithoutInternalListenerNameTest.setup).
    - BrokerLookupData.toLookupResult silently fell back to default URLs when
      an unknown advertisedListenerName was requested; restore the legacy
      PulsarServerException so callers (ExtensibleLoadManagerWrapper, ServerCnx)
      fail fast (fixes BrokerLookupDataTest.testConstructors).
    - LookupResult.toRedirectUri unconditionally removed the listenerName query
      parameter when the result had no brokerServiceListenerName; preserve the
      caller's original listenerName when the result doesn't specify one
      (fixes 
LookupResultTest.testToRedirectUriPreservesExistingQueryParameters).
    - PulsarWebResource.getWebServiceListenerName threw NPE in unit tests where
      httpRequest is not injected; return null when httpRequest is null
      (fixes PersistentTopicsTest.testCreateExistedPartition and similar
      mock-based admin tests).
    - Update RedirectManagerForLoadManagerMigrationTest to set
      loadManagerMigrationEnabled=true since the redirect manager now no-ops
      unless the flag is enabled.
    - Update NamespacesTest to mock getLookupResultForWebRequest with a real
      LookupResult instead of a URL (the mocks were not updated when the
      method signature changed, causing ClassCastException at runtime).
---
 .../org/apache/pulsar/broker/PulsarService.java    |  9 +++-
 .../extensions/data/BrokerLookupData.java          |  5 ++
 .../apache/pulsar/broker/lookup/LookupResult.java  |  6 +--
 .../pulsar/broker/web/PulsarWebResource.java       |  3 ++
 .../apache/pulsar/broker/admin/NamespacesTest.java | 59 +++++++++++++++++-----
 ...RedirectManagerForLoadManagerMigrationTest.java |  1 +
 6 files changed, 64 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b587fa3f935..c15153f3443 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -39,6 +39,7 @@ import java.net.MalformedURLException;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -135,6 +136,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
+import org.apache.pulsar.broker.validator.BindAddressValidator;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.validator.TransactionBatchedWriteValidator;
 import org.apache.pulsar.broker.web.RestException;
@@ -858,8 +860,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 throw new PulsarServerException("Cannot start the service once 
it was stopped");
             }
 
-            if (config.getWebServicePort().isEmpty() && 
config.getWebServicePortTls().isEmpty()) {
-                throw new 
IllegalArgumentException("webServicePort/webServicePortTls must be present");
+            if (config.getWebServicePort().isEmpty()
+                    && config.getWebServicePortTls().isEmpty()
+                    && BindAddressValidator.validateBindAddresses(config, 
Arrays.asList("http", "https")).isEmpty()) {
+                throw new IllegalArgumentException(
+                        "webServicePort/webServicePortTls or http/https 
bindAddresses must be present");
             }
 
             if (config.isAuthorizationEnabled() && 
!config.isAuthenticationEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
index 09ccf94bc87..f3dbec5fadd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
@@ -89,6 +89,11 @@ public record BrokerLookupData (String brokerId,
     }
 
     public LookupResult toLookupResult(LookupOptions options) throws 
PulsarServerException {
+        if (options.hasAdvertisedListenerName()
+                && 
!advertisedListeners.containsKey(options.getAdvertisedListenerName())) {
+            throw new PulsarServerException("the broker do not have "
+                    + options.getAdvertisedListenerName() + " listener");
+        }
         return LookupResult.create(this, options);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
index 0cd472f8114..3d46ef16cda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
@@ -262,12 +262,10 @@ public class LookupResult {
             // remove the parameter when the type is not redirect
             uriBuilder.replaceQueryParam("authoritative");
         }
-        // pass the listener parameter
+        // override the listener parameter only when the lookup result 
specifies one;
+        // otherwise leave the original request's listenerName query parameter 
untouched
         if (StringUtils.isNotBlank(brokerServiceListenerName)) {
             uriBuilder.replaceQueryParam(LISTENERNAME_PARAM, 
brokerServiceListenerName);
-        } else {
-            // remove the parameter when the listener name is not specified
-            uriBuilder.replaceQueryParam(LISTENERNAME_PARAM);
         }
         return uriBuilder.build();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index b8f41d0d327..5bf0ea14da7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -185,6 +185,9 @@ public abstract class PulsarWebResource {
     }
 
     public String getWebServiceListenerName() {
+        if (httpRequest == null) {
+            return null;
+        }
         return (String) 
httpRequest.getAttribute(WebService.ATTRIBUTE_LISTENER_NAME);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 7f31df02344..0f935908580 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -37,7 +37,6 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.URI;
-import java.net.URL;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -77,6 +76,7 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.admin.v2.Namespaces;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -738,7 +738,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         doReturn(uri).when(uriInfo).getRequestUri();
 
         // setup to redirect to another broker in the same cluster
-        doReturn(Optional.of(new URL("http://otherhost"; + ":" + 
8080))).when(nsSvc)
+        doReturn(Optional.of(LookupResult.builder()
+                .type(LookupResult.Type.RedirectUrl)
+                .httpUrl("http://otherhost:8080";)
+                .build())).when(nsSvc)
                 .getLookupResultForWebRequest(Mockito.argThat(new 
ArgumentMatcher<NamespaceName>() {
                     @Override
                     public boolean matches(NamespaceName nsname) {
@@ -783,7 +786,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 new byte[0], null, null);
 
         // setup ownership to localhost
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         LookupOptions options = LookupOptions.builder().authoritative(false)
                 .readOnly(false).build();
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getLookupResultForWebRequest(testNs,
 options);
@@ -862,7 +868,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         uriField.set(namespaces, uriInfo);
         doReturn(URI.create(pulsar.getWebServiceAddress() + 
"/dummy/uri")).when(uriInfo).getRequestUri();
 
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         String bundledNsLocal = "test-delete-namespace-with-bundles";
         List<String> boundaries = List.of("0x00000000", "0x80000000", 
"0xffffffff");
         BundlesData bundleData = BundlesData.builder()
@@ -940,7 +949,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testUnloadNamespaces() throws Exception {
         final NamespaceName testNs = this.testLocalNamespaces.get(1);
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
                 .getLookupResultForWebRequest(Mockito.argThat(ns -> 
ns.equals(testNs)), Mockito.any());
         doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(ns -> 
ns.equals(testNs)));
@@ -959,7 +971,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     @SuppressWarnings("deprecation")
     @Test
     public void testSplitBundles() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         String bundledNsLocal = "test-bundled-namespace-1";
         List<String> boundaries = List.of("0x00000000", "0xffffffff");
         BundlesData bundleData = BundlesData.builder()
@@ -1004,7 +1019,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     @SuppressWarnings("deprecation")
     @Test
     public void testSplitBundleWithUnDividedRange() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         String bundledNsLocal = "test-bundled-namespace-1";
         List<String> boundaries = List.of("0x00000000", "0x08375b1a", 
"0x08375b1b", "0xffffffff");
         BundlesData bundleData = BundlesData.builder()
@@ -1035,7 +1053,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testUnloadNamespaceWithBundles() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         String bundledNsLocal = "test-bundled-namespace-1";
         List<String> boundaries = List.of("0x00000000", "0x80000000", 
"0xffffffff");
         BundlesData bundleData = BundlesData.builder()
@@ -1113,7 +1134,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testRetention() throws Exception {
         try {
-            URL localWebServiceUrl = new 
URL(pulsar.getSafeWebServiceAddress());
+            LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
             String bundledNsLocal = "test-bundled-namespace-1";
             List<String> boundaries = List.of("0x00000000", "0xffffffff");
             BundlesData bundleData = BundlesData.builder()
@@ -1203,7 +1227,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     @SuppressWarnings("deprecation")
     @Test
     public void testValidateTopicOwnership() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         String bundledNsLocal = "test-bundled-namespace-1";
         List<String> boundaries = List.of("0x00000000", "0xffffffff");
         BundlesData bundleData = BundlesData.builder()
@@ -1596,7 +1623,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().deleteNamespace(namespace);
     }
 
-    private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) 
throws Exception {
+    private void mockWebUrl(LookupResult localWebServiceUrl, NamespaceName 
namespace) throws Exception {
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
                 .getLookupResultForWebRequest(Mockito.argThat(bundle -> 
bundle.getNamespaceObject().equals(namespace)),
                         Mockito.any());
@@ -2057,7 +2084,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         int initBundleCount = 4;
         BundlesData data = 
BundlesData.builder().numBundles(initBundleCount).build();
         admin.namespaces().createNamespace(namespace, data);
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         final NamespaceName testNs = NamespaceName.get(namespace);
         mockWebUrl(localWebServiceUrl, testNs);
         for (int i = 0; i < 10; i++) {
@@ -2618,7 +2648,10 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testBundleValidationAfterSplit() throws Exception {
-        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        LookupResult localWebServiceUrl = LookupResult.builder()
+                .type(LookupResult.Type.BrokerUrl)
+                .httpUrl(pulsar.getSafeWebServiceAddress())
+                .build();
         String bundledNsLocal = "test-bundle-validation-after-split";
         List<String> boundaries = List.of("0x00000000", "0xffffffff");
         BundlesData bundleData = BundlesData.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java
index 057a71f2a60..ee28f730080 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java
@@ -54,6 +54,7 @@ public class RedirectManagerForLoadManagerMigrationTest {
 
         
configuration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         configuration.setLoadBalancerDebugModeEnabled(true);
+        configuration.setLoadManagerMigrationEnabled(true);
 
         // Test 1: No load manager class name found.
         doReturn(CompletableFuture.completedFuture(

Reply via email to