This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch lh-support-webservice-bindaddresses-and-advertised-listeners in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 005ae15df341e734582ac62c4a16d93d9f103f96 Author: Lari Hotari <[email protected]> AuthorDate: Thu May 21 13:06:29 2026 +0300 [improve][broker] Tighten listener validation and redirect URI handling Address review feedback on PR #221: - Rename MultipleListenerValidator.validateAndAnalysisAdvertisedListener to validateAndUpdateAdvertisedListeners and document the in-place mutation of internalListenerName. - Reject listener names containing characters that would need URL encoding; validation is now consistent between advertisedListeners and bindAddresses. - Trim whitespace around comma-separated entries in advertisedListeners and bindAddresses; skip empty entries from extra/trailing commas. - Support IPv6 literals in advertisedListeners/bindAddresses and in ServiceConfigurationUtils URL builders by wrapping bare IPv6 in brackets via Netty NetUtil. - Document that port 0 in bindAddresses is supported only for the internal listener (broker.conf, standalone.conf, ServiceConfiguration). - Refactor LookupResult.internalCreate to split listener URL overrides into a small UrlOverride helper for readability. - Add toLookupRedirectUri that injects the listenerName query parameter for topic-lookup redirects only; admin redirects no longer rewrite it. - Improve the 412 entity in LookupResult.toRedirectUri to include the resolved listener name for easier debugging. - Tolerate a missing web-service listener on the target broker (rolling-upgrade safety) and fall back to its default URL. New tests: ServiceConfigurationUtilsTest, NamespaceServiceListenerResolutionTest, plus whitespace, IPv6, and listener-name validation cases in the existing validator/lookup test classes. --- conf/broker.conf | 1 + conf/standalone.conf | 1 + pulsar-broker-common/build.gradle.kts | 1 + .../apache/pulsar/broker/ServiceConfiguration.java | 3 +- .../pulsar/broker/ServiceConfigurationUtils.java | 43 ++++-- .../broker/validator/BindAddressValidator.java | 15 +- .../validator/MultipleListenerValidator.java | 72 +++++++-- .../broker/ServiceConfigurationUtilsTest.java | 83 +++++++++++ .../broker/validator/BindAddressValidatorTest.java | 23 +++ .../validator/MultipleListenerValidatorTest.java | 60 ++++++-- .../org/apache/pulsar/broker/PulsarService.java | 6 +- .../RedirectManagerForLoadManagerMigration.java | 4 +- .../apache/pulsar/broker/lookup/LookupResult.java | 161 +++++++++++++-------- .../pulsar/broker/lookup/TopicLookupBase.java | 4 +- .../pulsar/broker/namespace/NamespaceService.java | 15 +- .../pulsar/broker/lookup/LookupResultTest.java | 61 ++++++++ .../NamespaceServiceListenerResolutionTest.java | 90 ++++++++++++ 17 files changed, 528 insertions(+), 115 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 0038cb22571..8ed01cc8656 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -90,6 +90,7 @@ bindAddress=0.0.0.0 # Each ip:port may be bound by exactly one (listener, scheme) pair. An entry that exactly # matches the auto-derived internal-listener binding is tolerated; assigning the same ip:port # to a different listener or scheme fails validation. +# Port 0 (OS-assigned ephemeral port) is supported only for the internal listener. bindAddresses= # Hostname or IP advertised to clients for the internal listener. diff --git a/conf/standalone.conf b/conf/standalone.conf index 69011aeb77e..38c5d1a5b89 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -64,6 +64,7 @@ bindAddress=0.0.0.0 # Each ip:port may be bound by exactly one (listener, scheme) pair. An entry that exactly # matches the auto-derived internal-listener binding is tolerated; assigning the same ip:port # to a different listener or scheme fails validation. +# Port 0 (OS-assigned ephemeral port) is supported only for the internal listener. bindAddresses= # Hostname or IP advertised to clients for the internal listener. diff --git a/pulsar-broker-common/build.gradle.kts b/pulsar-broker-common/build.gradle.kts index 1fa8c0437c7..6e2bc73feba 100644 --- a/pulsar-broker-common/build.gradle.kts +++ b/pulsar-broker-common/build.gradle.kts @@ -27,6 +27,7 @@ dependencies { implementation(libs.slog) implementation(libs.guava) implementation(libs.commons.lang3) + implementation(libs.netty.common) implementation(libs.bookkeeper.server) implementation(libs.opentelemetry.api) implementation(libs.simpleclient) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3829f6d062c..96f2908a74d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -293,7 +293,8 @@ public class ServiceConfiguration implements PulsarConfiguration { + " A local hostname is also accepted but not recommended.\n" + " Each `ip:port` may be bound by exactly one (listener, scheme) pair. An entry" + " that exactly matches the auto-derived internal-listener binding is tolerated;" - + " assigning the same `ip:port` to a different listener or scheme fails validation.") + + " assigning the same `ip:port` to a different listener or scheme fails validation.\n" + + " Port `0` (OS-assigned ephemeral port) is supported only for the internal listener.") private String bindAddresses; @FieldContext(category = CATEGORY_SERVER, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java index fb7d5d34706..893dc67425f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; import static org.apache.commons.lang3.StringUtils.isBlank; +import io.netty.util.NetUtil; import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; @@ -61,7 +62,7 @@ public class ServiceConfigurationUtils { public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration, boolean ignoreAdvertisedListener) { Map<String, AdvertisedListener> result = MultipleListenerValidator - .validateAndAnalysisAdvertisedListener(configuration); + .validateAndUpdateAdvertisedListeners(configuration); String advertisedAddress = configuration.getAdvertisedAddress(); if (advertisedAddress != null) { @@ -90,7 +91,7 @@ public class ServiceConfigurationUtils { */ public static AdvertisedListener getInternalListener(ServiceConfiguration config, String protocol) { Map<String, AdvertisedListener> result = MultipleListenerValidator - .validateAndAnalysisAdvertisedListener(config); + .validateAndUpdateAdvertisedListeners(config); AdvertisedListener internal = result.get(config.getInternalListenerName()); if (internal == null || !internal.hasUriForProtocol(protocol)) { // Search for an advertised listener for same protocol @@ -114,8 +115,26 @@ public class ServiceConfigurationUtils { return internal; } - private static URI createUriOrNull(String scheme, String hostname, Optional<Integer> port) { - return port.map(p -> URI.create(String.format("%s://%s:%d", scheme, hostname, p))).orElse(null); + private static URI createUriOrNull(String scheme, String ipOrHost, Optional<Integer> port) { + return port.map(p -> URI.create(String.format("%s://%s:%d", scheme, formatHost(ipOrHost), p))).orElse(null); + } + + /** + * Wrap bare IPv6 literals in square brackets so they parse correctly as the host component of a + * URL. IPv4 addresses and hostnames are returned unchanged, as is an IPv6 literal that is + * already bracketed. + */ + static String formatHost(String ipOrHost) { + if (ipOrHost == null || ipOrHost.isEmpty()) { + return ipOrHost; + } + if (ipOrHost.startsWith("[") && ipOrHost.endsWith("]")) { + return ipOrHost; + } + if (NetUtil.isValidIpV6Address(ipOrHost)) { + return "[" + ipOrHost + "]"; + } + return ipOrHost; } private static URI firstNonNullUri(URI... uris) { @@ -134,19 +153,19 @@ public class ServiceConfigurationUtils { return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); } - public static String brokerUrl(String host, int port) { - return String.format("pulsar://%s:%d", host, port); + public static String brokerUrl(String ipOrHost, int port) { + return String.format("pulsar://%s:%d", formatHost(ipOrHost), port); } - public static String brokerUrlTls(String host, int port) { - return String.format("pulsar+ssl://%s:%d", host, port); + public static String brokerUrlTls(String ipOrHost, int port) { + return String.format("pulsar+ssl://%s:%d", formatHost(ipOrHost), port); } - public static String webServiceUrl(String host, int port) { - return String.format("http://%s:%d", host, port); + public static String webServiceUrl(String ipOrHost, int port) { + return String.format("http://%s:%d", formatHost(ipOrHost), port); } - public static String webServiceUrlTls(String host, int port) { - return String.format("https://%s:%d", host, port); + public static String webServiceUrlTls(String ipOrHost, int port) { + return String.format("https://%s:%d", formatHost(ipOrHost), port); } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java index 41ae9452d2a..1df78984846 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java @@ -38,7 +38,7 @@ import org.apache.pulsar.common.configuration.BindAddress; */ public class BindAddressValidator { - private static final Pattern BIND_ADDRESSES_PATTERN = Pattern.compile("(?<name>\\w+):(?<url>.+)$"); + private static final Pattern BIND_ADDRESSES_PATTERN = Pattern.compile("(?<name>[^:]+):(?<url>.+)$"); /** * Validate the configuration of `bindAddresses`. @@ -64,8 +64,11 @@ public class BindAddressValidator { // migrate the legacy port-based configuration to bind addresses tagged with the internal listener name List<BindAddress> addresses = migrateBindAddresses(config, internalListenerName); - // parse the list of additional bind addresses + // parse the list of additional bind addresses; trim whitespace around the comma-separated + // entries so configurations split across multiple lines or padded for readability are accepted Arrays.stream(StringUtils.split(StringUtils.defaultString(config.getBindAddresses()), ",")) + .map(StringUtils::trim) + .filter(StringUtils::isNotEmpty) .map(s -> { Matcher m = BIND_ADDRESSES_PATTERN.matcher(s); if (!m.matches()) { @@ -73,7 +76,11 @@ public class BindAddressValidator { } return m; }) - .map(m -> new BindAddress(m.group("name"), URI.create(m.group("url")))) + .map(m -> { + String name = StringUtils.trim(m.group("name")); + MultipleListenerValidator.validateListenerName(name); + return new BindAddress(name, URI.create(StringUtils.trim(m.group("url")))); + }) .forEach(addresses::add); // apply the filter @@ -109,7 +116,7 @@ public class BindAddressValidator { if (addr.getAddress().getPort() == 0) { continue; } - String ipPort = addr.getAddress().getHost() + ":" + addr.getAddress().getPort(); + String ipPort = MultipleListenerValidator.formatHostPort(addr.getAddress()); BindAddress prior = uniqueIpPort.putIfAbsent(ipPort, addr); if (prior != null) { throw new IllegalArgumentException("bindAddresses: ip:port `" + ipPort diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java index 73fbef9305d..5a0c4179ff2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.validator; +import io.netty.util.NetUtil; import java.net.URI; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; @@ -35,8 +37,51 @@ import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; */ public final class MultipleListenerValidator { + /** Allowed listener-name characters: ASCII letters, digits, underscore, hyphen. */ + private static final Pattern LISTENER_NAME_PATTERN = Pattern.compile("[A-Za-z0-9_-]+"); + + /** + * Format the host:port part of a URI for use as a uniqueness key and in error messages, wrapping + * IPv6 literals in brackets so that the colon separator is unambiguous. {@link URI#getHost()} may + * or may not include the brackets depending on the JDK, so they are stripped before the + * {@link NetUtil#isValidIpV6Address} check. + */ + static String formatHostPort(URI uri) { + String host = uri.getHost(); + if (host == null) { + return host + ":" + uri.getPort(); + } + String unbracketed = host.startsWith("[") && host.endsWith("]") + ? host.substring(1, host.length() - 1) : host; + if (NetUtil.isValidIpV6Address(unbracketed)) { + return "[" + unbracketed + "]:" + uri.getPort(); + } + return host + ":" + uri.getPort(); + } + /** - * Validate the configuration of `advertisedListeners` and `internalListenerName`. + * Validate a listener name. Listener names must be non-blank and contain only ASCII letters, + * digits, underscore, and hyphen so they are safe to embed in URLs without encoding. + * + * @throws IllegalArgumentException if the name is null, blank, or contains disallowed characters. + */ + public static void validateListenerName(String name) { + if (StringUtils.isBlank(name)) { + throw new IllegalArgumentException("listener name must not be blank"); + } + if (!LISTENER_NAME_PATTERN.matcher(name).matches()) { + throw new IllegalArgumentException("listener name `" + name + "` must contain only ASCII" + + " letters, digits, underscore, or hyphen"); + } + } + + /** + * Validate `advertisedListeners` and `internalListenerName`, returning the parsed listener map. + * <p> + * This method mutates the supplied {@link ServiceConfiguration}: when {@code internalListenerName} + * is blank, it is written back with the resolved fallback value (the first parsed listener if any, + * otherwise {@value ServiceConfiguration#DEFAULT_INTERNAL_LISTENER_NAME}) so that subsequent reads + * from the config see the effective value. * <ol> * <li>`advertisedListeners` is a comma-separated list of endpoints in the form * `listener:scheme://host:port`. Supported schemes are `pulsar`, `pulsar+ssl`, `http`, and `https`. @@ -51,16 +96,18 @@ public final class MultipleListenerValidator { * internal listener to override individual URLs (e.g. a custom TLS hostname) while still * benefiting from auto-population for the rest. * </ol> - * @param config the pulsar broker configuration. + * @param config the pulsar broker configuration; updated in place when + * {@code internalListenerName} is blank. * @return the parsed and validated advertised listeners, keyed by listener name. */ - public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListener(ServiceConfiguration config) { + public static Map<String, AdvertisedListener> validateAndUpdateAdvertisedListeners(ServiceConfiguration config) { Map<String, AdvertisedListener> result = parseAdvertisedListeners(config); - // Resolve `internalListenerName` if the user left it blank. For backward compatibility, - // prefer the first entry parsed from `advertisedListeners`; otherwise fall back to the - // constant default ("internal"). The field's own default is also "internal", so this - // branch is only taken when the user explicitly set the property to an empty string. + // Resolve `internalListenerName` if the user left it blank (null, empty, or whitespace). + // For backward compatibility, prefer the first entry parsed from `advertisedListeners`; + // otherwise fall back to the constant default ("internal"). The field's own default is + // also "internal", so this branch is normally taken only when the user explicitly set the + // property to an empty string. if (StringUtils.isBlank(config.getInternalListenerName())) { String fallback = result.isEmpty() ? ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME @@ -117,13 +164,20 @@ public final class MultipleListenerValidator { return new LinkedHashMap<>(); } Map<String, List<String>> listeners = new LinkedHashMap<>(); - for (final String str : StringUtils.split(config.getAdvertisedListeners(), ",")) { + // Trim whitespace around comma-separated entries so configurations split across multiple + // lines or padded for readability are accepted, and skip empties. + for (final String raw : StringUtils.split(config.getAdvertisedListeners(), ",")) { + String str = StringUtils.trim(raw); + if (StringUtils.isEmpty(str)) { + continue; + } int index = str.indexOf(":"); if (index <= 0) { throw new IllegalArgumentException("the configure entry `advertisedListeners` is invalid. because " + str + " do not contain listener name"); } String listenerName = StringUtils.trim(str.substring(0, index)); + validateListenerName(listenerName); String value = StringUtils.trim(str.substring(index + 1)); listeners.computeIfAbsent(listenerName, k -> new ArrayList<>(2)); listeners.get(listenerName).add(value); @@ -169,7 +223,7 @@ public final class MultipleListenerValidator { } } - String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort()); + String hostPort = formatHostPort(uri); Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> new TreeSet<>()); sets.add(entry.getKey()); if (sets.size() > 1) { diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/ServiceConfigurationUtilsTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/ServiceConfigurationUtilsTest.java new file mode 100644 index 00000000000..1dccb5ab77a --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/ServiceConfigurationUtilsTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.testng.Assert.assertEquals; +import java.net.URI; +import org.testng.annotations.Test; + +public class ServiceConfigurationUtilsTest { + + @Test + public void testBrokerUrlWithHostname() { + assertEquals(ServiceConfigurationUtils.brokerUrl("broker-1.example.com", 6650), + "pulsar://broker-1.example.com:6650"); + } + + @Test + public void testBrokerUrlWithIpv4() { + assertEquals(ServiceConfigurationUtils.brokerUrl("10.0.0.1", 6650), "pulsar://10.0.0.1:6650"); + } + + @Test + public void testBrokerUrlWithIpv6Bare() { + // A bare IPv6 literal must be wrapped in brackets so the resulting URL parses correctly. + String url = ServiceConfigurationUtils.brokerUrl("::1", 6650); + assertEquals(url, "pulsar://[::1]:6650"); + URI parsed = URI.create(url); + assertEquals(parsed.getPort(), 6650); + } + + @Test + public void testBrokerUrlTlsWithIpv6Bare() { + String url = ServiceConfigurationUtils.brokerUrlTls("fe80::1", 6651); + assertEquals(url, "pulsar+ssl://[fe80::1]:6651"); + assertEquals(URI.create(url).getPort(), 6651); + } + + @Test + public void testWebServiceUrlWithIpv6Bare() { + String url = ServiceConfigurationUtils.webServiceUrl("2001:db8::1", 8080); + assertEquals(url, "http://[2001:db8::1]:8080"); + assertEquals(URI.create(url).getPort(), 8080); + } + + @Test + public void testWebServiceUrlTlsWithIpv6Bare() { + String url = ServiceConfigurationUtils.webServiceUrlTls("2001:db8::1", 8443); + assertEquals(url, "https://[2001:db8::1]:8443"); + assertEquals(URI.create(url).getPort(), 8443); + } + + @Test + public void testIpv6AlreadyBracketedIsNotDoubleWrapped() { + // If the caller passed a pre-bracketed IPv6 literal, leave it as-is rather than producing + // `pulsar://[[::1]]:6650`. + String url = ServiceConfigurationUtils.brokerUrl("[::1]", 6650); + assertEquals(url, "pulsar://[::1]:6650"); + assertEquals(URI.create(url).getPort(), 6650); + } + + @Test + public void testIpv4InUrlIsNotBracketed() { + // IPv4 addresses must not be bracketed. + String url = ServiceConfigurationUtils.webServiceUrl("0.0.0.0", 8080); + assertEquals(url, "http://0.0.0.0:8080"); + } +} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java index 3495cb1b74c..3aa79f7fa40 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java @@ -234,4 +234,27 @@ public class BindAddressValidatorTest { assertTrue(e.getMessage().contains("0.0.0.0:8080"), "expected ip:port in message but got: " + e.getMessage()); } + + @Test + public void testWhitespaceBetweenEntriesIsTrimmed() { + ServiceConfiguration config = newEmptyConfiguration(); + // Spaces, newlines, and tabs around the comma separators and around the entries themselves + // must all be tolerated and trimmed. + config.setBindAddresses(" internal:pulsar://0.0.0.0:6650 ,\n\texternal:pulsar+ssl://0.0.0.0:6651 "); + List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("external", URI.create("pulsar+ssl://0.0.0.0:6651")))); + } + + @Test + public void testEmptyEntriesAreSkipped() { + ServiceConfiguration config = newEmptyConfiguration(); + // A trailing comma or an extra comma in the middle should be ignored, not flagged as malformed. + config.setBindAddresses("internal:pulsar://0.0.0.0:6650, ,external:pulsar+ssl://0.0.0.0:6651,"); + List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("external", URI.create("pulsar+ssl://0.0.0.0:6651")))); + } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java index 98fbe6cc1e8..d66c917f9b4 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java @@ -71,7 +71,7 @@ public class MultipleListenerValidatorTest { ServiceConfiguration config = newConfigWithNoPorts(); config.setInternalListenerName(null); config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651"); - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); assertEquals("internal", config.getInternalListenerName()); } @@ -85,7 +85,7 @@ public class MultipleListenerValidatorTest { config.setAdvertisedListeners( "region1:pulsar://region1.example.com:6650,region2:pulsar://region2.example.com:6650"); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); // `region1` is the first declared, so it becomes the internal listener. assertEquals(config.getInternalListenerName(), "region1"); // Both region entries are preserved in the result map. @@ -103,7 +103,7 @@ public class MultipleListenerValidatorTest { config.setBrokerServicePort(Optional.of(6650)); config.setWebServicePort(Optional.of(8080)); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); assertEquals(config.getInternalListenerName(), ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); assertNotNull(listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME), "the legacy-port-synthesized internal listener should be present"); @@ -113,7 +113,7 @@ public class MultipleListenerValidatorTest { public void testMalformedListener() { ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(":pulsar://127.0.0.1:6660"); - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); } @Test(expectedExceptions = IllegalArgumentException.class) @@ -122,7 +122,7 @@ public class MultipleListenerValidatorTest { config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651," + " internal:pulsar://192.168.1.11:6660, internal:pulsar+ssl://192.168.1.11:6651"); config.setInternalListenerName("internal"); - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); } @Test(expectedExceptions = IllegalArgumentException.class) @@ -130,7 +130,7 @@ public class MultipleListenerValidatorTest { ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " internal:pulsar://192.168.1.11:6660"); config.setInternalListenerName("internal"); - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); } @Test(expectedExceptions = IllegalArgumentException.class) @@ -139,7 +139,7 @@ public class MultipleListenerValidatorTest { config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6661," + " internal:pulsar+ssl://192.168.1.11:6661"); config.setInternalListenerName("internal"); - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); } @Test(expectedExceptions = IllegalArgumentException.class) @@ -147,7 +147,7 @@ public class MultipleListenerValidatorTest { ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " external:pulsar://127.0.0.1:6660"); config.setInternalListenerName("internal"); - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); } @Test @@ -158,7 +158,7 @@ public class MultipleListenerValidatorTest { config.setBrokerServicePort(Optional.of(6650)); config.setWebServicePort(Optional.of(8080)); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); AdvertisedListener internal = listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); assertNotNull(internal, "expected an `internal` listener to be synthesized from legacy ports"); assertEquals(internal.getBrokerServiceUrl(), URI.create("pulsar://broker-1.example.com:6650")); @@ -176,7 +176,7 @@ public class MultipleListenerValidatorTest { config.setWebServicePort(Optional.of(8080)); config.setWebServicePortTls(Optional.of(8443)); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); AdvertisedListener internal = listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); assertNotNull(internal); assertEquals(internal.getBrokerServiceUrl(), URI.create("pulsar://broker-1.example.com:6650")); @@ -196,7 +196,7 @@ public class MultipleListenerValidatorTest { config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660," + "region1:pulsar+ssl://region1.example.com:6661"); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); assertEquals(listeners.size(), 2); AdvertisedListener region1 = listeners.get("region1"); assertNotNull(region1); @@ -219,7 +219,7 @@ public class MultipleListenerValidatorTest { config.setAdvertisedListeners("internal:pulsar://explicit.example.com:6660," + "internal:http://explicit.example.com:8888"); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); assertEquals(listeners.size(), 1); AdvertisedListener internal = listeners.get("internal"); assertNotNull(internal); @@ -243,7 +243,7 @@ public class MultipleListenerValidatorTest { config.setWebServicePortTls(Optional.of(8443)); config.setAdvertisedListeners("internal:pulsar://override.example.com:6660"); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); AdvertisedListener internal = listeners.get("internal"); assertNotNull(internal); // Overridden by the explicit advertisedListeners entry: @@ -265,7 +265,7 @@ public class MultipleListenerValidatorTest { config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660," + "region1:http://region1.example.com:8888"); Map<String, AdvertisedListener> listeners = - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); assertEquals(listeners.size(), 1); AdvertisedListener region1 = listeners.get("region1"); assertNotNull(region1); @@ -280,7 +280,7 @@ public class MultipleListenerValidatorTest { config.setInternalListenerName("region1"); config.setAdvertisedListeners("region2:pulsar://region2.example.com:6660"); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config)); + () -> MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config)); assertTrue(e.getMessage().contains("region1"), "expected error to mention the listener name: " + e.getMessage()); } @@ -291,6 +291,36 @@ public class MultipleListenerValidatorTest { assertEquals(config.getInternalListenerName(), ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); } + @Test + public void testWhitespaceBetweenEntriesIsTrimmed() { + ServiceConfiguration config = newConfigWithNoPorts(); + config.setInternalListenerName("internal"); + // Spaces, newlines, and tabs around the comma separators and around the entries themselves + // must all be tolerated and trimmed. + config.setAdvertisedListeners( + " internal:pulsar://broker-1:6650 ,\n\texternal:pulsar://broker-1.public:6650 "); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); + assertEquals(listeners.size(), 2); + assertEquals(listeners.get("internal").getBrokerServiceUrl(), URI.create("pulsar://broker-1:6650")); + assertEquals(listeners.get("external").getBrokerServiceUrl(), + URI.create("pulsar://broker-1.public:6650")); + } + + @Test + public void testEmptyEntriesAreSkipped() { + ServiceConfiguration config = newConfigWithNoPorts(); + config.setInternalListenerName("internal"); + // A trailing comma or an extra comma in the middle should be ignored. + config.setAdvertisedListeners("internal:pulsar://broker-1:6650, ,external:pulsar://broker-1.public:6650,"); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); + assertEquals(listeners.size(), 2); + assertEquals(listeners.get("internal").getBrokerServiceUrl(), URI.create("pulsar://broker-1:6650")); + assertEquals(listeners.get("external").getBrokerServiceUrl(), + URI.create("pulsar://broker-1.public:6650")); + } + private ServiceConfiguration newConfigWithNoPorts() { ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerServicePort(Optional.empty()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index f85d12116ef..6a46fb3760d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -368,7 +368,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { this.openTelemetry = new PulsarBrokerOpenTelemetry(config, openTelemetrySdkBuilderCustomizer); // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName` - this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + this.advertisedListeners = MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); // the advertised address is defined as the host component of the broker's canonical name. this.advertisedAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); @@ -976,12 +976,12 @@ public class PulsarService implements AutoCloseable, ShutdownService { // necessary both for dynamic ports (`Optional.of(0)`) and for the case where the broker // is configured only via `bindAddresses` (legacy port properties left as // `Optional.empty()`), so that downstream code — in particular - // MultipleListenerValidator.validateAndAnalysisAdvertisedListener — can synthesize the + // MultipleListenerValidator.validateAndUpdateAdvertisedListeners — can synthesize the // internal advertised listener from the now-known ports. brokerService.getListenPort().ifPresent(port -> config.setBrokerServicePort(Optional.of(port))); brokerService.getListenPortTls().ifPresent(port -> config.setBrokerServicePortTls(Optional.of(port))); // Recompute the cached advertised listener map now that the bound ports are known. - this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + this.advertisedListeners = MultipleListenerValidator.validateAndUpdateAdvertisedListeners(config); this.webServiceAddress = webAddress(config); this.webServiceAddressTls = webAddressTls(config); this.brokerServiceUrl = brokerUrl(config); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java index e84a6aecb95..13e591f87cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigration.java @@ -89,8 +89,8 @@ public class RedirectManagerForLoadManagerMigration { * @param options lookup options * @return lookup result */ - public CompletableFuture<Optional<LookupResult>> redirectIfLoadBalancerOnBrokerIsNotExpected - (LookupOptions options) { + public CompletableFuture<Optional<LookupResult>> redirectIfLoadBalancerOnBrokerIsNotExpected( + LookupOptions options) { if (!pulsar.getConfiguration().isLoadManagerMigrationEnabled()) { // no-op when load manager migration is disabled. return CompletableFuture.completedFuture(Optional.empty()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java index c45614a8be3..f27cf763a96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java @@ -22,6 +22,8 @@ import static org.apache.pulsar.broker.lookup.v2.TopicLookup.LISTENERNAME_PARAM; import java.net.URI; import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import java.util.function.Predicate; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; @@ -125,56 +127,27 @@ public class LookupResult { String webServiceUrlTls, String pulsarServiceUrl, String pulsarServiceUrlTls, Map<String, AdvertisedListener> advertisedListeners) { - String httpUrl = webServiceUrl; - String httpUrlTls = webServiceUrlTls; - String brokerServiceUrl = pulsarServiceUrl; - String brokerServiceUrlTls = pulsarServiceUrlTls; - String brokerServiceListenerName = null; - String webServiceListenerName = null; - - if (options != null && options.hasAdvertisedListenerName()) { - String advertisedListenerName = options.getAdvertisedListenerName(); - var advertisedListener = advertisedListeners.get(advertisedListenerName); - if (advertisedListener != null) { - brokerServiceListenerName = advertisedListenerName; - brokerServiceUrl = null; - brokerServiceUrlTls = null; - if (advertisedListener.getBrokerServiceUrl() != null) { - brokerServiceUrl = advertisedListener.getBrokerServiceUrl().toString(); - } - if (advertisedListener.getBrokerServiceUrlTls() != null) { - brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls().toString(); - } - } + UrlOverride urls = new UrlOverride(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls); + + AdvertisedListener brokerListener = lookupListener(options, advertisedListeners, + LookupOptions::hasAdvertisedListenerName, LookupOptions::getAdvertisedListenerName); + if (brokerListener != null) { + urls.brokerServiceListenerName = options.getAdvertisedListenerName(); + urls.applyBrokerServiceOverride(brokerListener); } - if (options != null && options.hasWebServiceAdvertisedListenerName()) { - String advertisedListenerName = options.getWebServiceAdvertisedListenerName(); - var advertisedListener = advertisedListeners.get(advertisedListenerName); - if (advertisedListener != null) { - webServiceListenerName = advertisedListenerName; - httpUrl = null; - httpUrlTls = null; - if (advertisedListener.getBrokerHttpUrl() != null) { - httpUrl = advertisedListener.getBrokerHttpUrl().toString(); - } - if (advertisedListener.getBrokerHttpsUrl() != null) { - httpUrlTls = advertisedListener.getBrokerHttpsUrl().toString(); - } - // default to use the webServiceAdvertisedListenerName as the brokerServiceListenerName if there - // is a brokerServiceUrl or brokerServiceUrlTls configured for the webServiceAdvertisedListenerName - if (brokerServiceListenerName == null && (advertisedListener.getBrokerServiceUrl() != null - || advertisedListener.getBrokerServiceUrlTls() != null)) { - brokerServiceListenerName = advertisedListenerName; - brokerServiceUrl = null; - brokerServiceUrlTls = null; - if (advertisedListener.getBrokerServiceUrl() != null) { - brokerServiceUrl = advertisedListener.getBrokerServiceUrl().toString(); - } - if (advertisedListener.getBrokerServiceUrlTls() != null) { - brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls().toString(); - } - } + AdvertisedListener webListener = lookupListener(options, advertisedListeners, + LookupOptions::hasWebServiceAdvertisedListenerName, + LookupOptions::getWebServiceAdvertisedListenerName); + if (webListener != null) { + urls.webServiceListenerName = options.getWebServiceAdvertisedListenerName(); + urls.applyWebServiceOverride(webListener); + // default the brokerServiceListenerName to the webServiceAdvertisedListenerName if the + // listener also configures broker service URLs and no separate advertisedListenerName was given + if (urls.brokerServiceListenerName == null + && (webListener.getBrokerServiceUrl() != null || webListener.getBrokerServiceUrlTls() != null)) { + urls.brokerServiceListenerName = options.getWebServiceAdvertisedListenerName(); + urls.applyBrokerServiceOverride(webListener); } } @@ -192,16 +165,55 @@ public class LookupResult { return builder() .type(type) .brokerId(brokerId) - .httpUrl(httpUrl) - .httpUrlTls(httpUrlTls) - .brokerServiceUrl(brokerServiceUrl) - .brokerServiceUrlTls(brokerServiceUrlTls) + .httpUrl(urls.httpUrl) + .httpUrlTls(urls.httpUrlTls) + .brokerServiceUrl(urls.brokerServiceUrl) + .brokerServiceUrlTls(urls.brokerServiceUrlTls) .authoritativeRedirect(authoritativeRedirect) - .brokerServiceListenerName(brokerServiceListenerName) - .webServiceListenerName(webServiceListenerName) + .brokerServiceListenerName(urls.brokerServiceListenerName) + .webServiceListenerName(urls.webServiceListenerName) .build(); } + private static AdvertisedListener lookupListener(LookupOptions options, + Map<String, AdvertisedListener> advertisedListeners, + Predicate<LookupOptions> hasName, + Function<LookupOptions, String> getName) { + if (options == null || !hasName.test(options)) { + return null; + } + return advertisedListeners.get(getName.apply(options)); + } + + /** Mutable URL-set used while resolving listener-specific overrides for a LookupResult. */ + private static final class UrlOverride { + String httpUrl; + String httpUrlTls; + String brokerServiceUrl; + String brokerServiceUrlTls; + String brokerServiceListenerName; + String webServiceListenerName; + + UrlOverride(String httpUrl, String httpUrlTls, String brokerServiceUrl, String brokerServiceUrlTls) { + this.httpUrl = httpUrl; + this.httpUrlTls = httpUrlTls; + this.brokerServiceUrl = brokerServiceUrl; + this.brokerServiceUrlTls = brokerServiceUrlTls; + } + + void applyBrokerServiceOverride(AdvertisedListener listener) { + brokerServiceUrl = listener.getBrokerServiceUrl() != null + ? listener.getBrokerServiceUrl().toString() : null; + brokerServiceUrlTls = listener.getBrokerServiceUrlTls() != null + ? listener.getBrokerServiceUrlTls().toString() : null; + } + + void applyWebServiceOverride(AdvertisedListener listener) { + httpUrl = listener.getBrokerHttpUrl() != null ? listener.getBrokerHttpUrl().toString() : null; + httpUrlTls = listener.getBrokerHttpsUrl() != null ? listener.getBrokerHttpsUrl().toString() : null; + } + } + public boolean isBrokerUrl() { return type == Type.BrokerUrl; } @@ -233,7 +245,7 @@ public class LookupResult { * @return the redirect URI */ public URI toRedirectUri(URI requestUri) { - return toRedirectUriInternal(requestUri, this.authoritativeRedirect); + return toRedirectUriInternal(requestUri, this.authoritativeRedirect, false); } /** @@ -242,18 +254,40 @@ public class LookupResult { * redirect as authoritative regardless of what the lookup result carried). */ public URI toRedirectUri(URI requestUri, boolean authoritativeRedirectOverride) { - return toRedirectUriInternal(requestUri, authoritativeRedirectOverride); + return toRedirectUriInternal(requestUri, authoritativeRedirectOverride, false); } - private URI toRedirectUriInternal(URI requestUri, boolean authoritativeRedirect) { + /** + * Same as {@link #toRedirectUri(URI)} but specialised for topic-lookup redirects: when this + * {@code LookupResult} has a resolved {@code brokerServiceListenerName} it is always written to + * the {@code listenerName} query parameter on the redirect URI. This handles the case where the + * original lookup request carried the listener name in a header rather than as a query + * parameter — the header does not survive an HTTP redirect, so the parameter form must be + * propagated to the next broker. Other (non-lookup) redirect paths use {@link #toRedirectUri} + * and leave the parameter alone. + */ + public URI toLookupRedirectUri(URI requestUri) { + return toRedirectUriInternal(requestUri, this.authoritativeRedirect, true); + } + + private URI toRedirectUriInternal(URI requestUri, boolean authoritativeRedirect, + boolean injectListenerNameQueryParam) { boolean requireHttps = "https".equalsIgnoreCase(requestUri.getScheme()); String webServiceUrl = requireHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl(); if (webServiceUrl == null) { // Preserve the legacy 412 error semantics when the redirect target broker has no URL // configured for the requested scheme. + String scheme = requireHttps ? "https" : "http"; + StringBuilder entity = new StringBuilder() + .append("No ").append(scheme).append(" URL configured for broker ") + .append(lookupData.getBrokerId()); + if (StringUtils.isNotBlank(webServiceListenerName)) { + entity.append(" on web service listener `").append(webServiceListenerName).append("`"); + } else if (StringUtils.isNotBlank(brokerServiceListenerName)) { + entity.append(" on listener `").append(brokerServiceListenerName).append("`"); + } throw new WebApplicationException(Response.status(Response.Status.PRECONDITION_FAILED) - .entity("No " + (requireHttps ? "https" : "http") - + " URL configured for broker " + lookupData.getBrokerId()) + .entity(entity.toString()) .build()); } URI webServiceUri = URI.create(webServiceUrl); @@ -269,9 +303,12 @@ public class LookupResult { // remove the parameter when the type is not redirect uriBuilder.replaceQueryParam("authoritative"); } - // override the listener parameter only when the lookup result specifies one; - // otherwise leave the original request's listenerName query parameter untouched - if (StringUtils.isNotBlank(brokerServiceListenerName)) { + // Only set the listenerName query parameter on topic-lookup redirects. The original lookup + // request can carry it either as a query parameter or as a header; the latter does not + // survive an HTTP redirect, so the resolved listener name must be reinjected as a query + // parameter so the next broker sees it. Other redirect paths (admin endpoints) do not + // understand `listenerName` as a parameter, so for those we leave the request URI alone. + if (injectListenerNameQueryParam && StringUtils.isNotBlank(brokerServiceListenerName)) { uriBuilder.replaceQueryParam(LISTENERNAME_PARAM, brokerServiceListenerName); } return uriBuilder.build(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 72bf6d14c31..dd08fc10e91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -108,7 +108,7 @@ public class TopicLookupBase extends PulsarWebResource { LookupResult lookupResult = optionalResult.get(); if (lookupResult.isLoadManagerMigration()) { - URI redirectUri = lookupResult.toRedirectUri(uri.getRequestUri()); + URI redirectUri = lookupResult.toLookupRedirectUri(uri.getRequestUri()); log.debug().log("Redirecting to a broker with the expected load manager."); throw new WebApplicationException(Response.temporaryRedirect(redirectUri).build()); } @@ -121,7 +121,7 @@ public class TopicLookupBase extends PulsarWebResource { throw new UnsupportedOperationException( "This implementation expects that the requestUri is a topic lookup."); } - URI redirectUri = lookupResult.toRedirectUri(requestUri); + URI redirectUri = lookupResult.toLookupRedirectUri(requestUri); log.debug() .attr("topic", topicName) .attr("redirectUri", redirectUri) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index db6ce90c203..fb7c3d68bf6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -263,8 +263,8 @@ public class NamespaceService implements AutoCloseable { return future; } - private CompletableFuture<Optional<LookupResult>> redirectIfLoadBalancerOnBrokerIsNotExpected - (ServiceUnitId bundle, LookupOptions options) { + private CompletableFuture<Optional<LookupResult>> redirectIfLoadBalancerOnBrokerIsNotExpected( + ServiceUnitId bundle, LookupOptions options) { if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { return CompletableFuture.completedFuture(Optional.empty()); } @@ -529,8 +529,9 @@ public class NamespaceService implements AutoCloseable { return CompletableFuture.completedFuture(null); } - private static void resolveBrokerServiceLookupResult(LookupOptions options, NamespaceEphemeralData nsData, - CompletableFuture<Optional<LookupResult>> future) { + // package-private for tests + static void resolveBrokerServiceLookupResult(LookupOptions options, NamespaceEphemeralData nsData, + CompletableFuture<Optional<LookupResult>> future) { LookupResult result = LookupResult.create(nsData, options); // fail the lookup if advertised listener name is provided and does not match @@ -542,12 +543,16 @@ public class NamespaceService implements AutoCloseable { return; } + // Tolerate a missing web service listener on the target broker — during a rolling cluster + // upgrade, older brokers may not have published the listener yet. Fall back to the default + // web service URL (the broker's primary HTTP/HTTPS) in that case; toRedirectUri picks it up + // because httpUrl/httpUrlTls were left at the broker's defaults by LookupResult.create. if (options.hasWebServiceAdvertisedListenerName() && !Objects.equals(result.getWebServiceListenerName(), options.getWebServiceAdvertisedListenerName())) { log.warn() .attr("brokerId", result.getLookupData().getBrokerId()) .attr("webServiceListenerName", options.getWebServiceAdvertisedListenerName()) - .log("The broker doesn't have the listener configured."); + .log("Target broker has no matching web service listener; redirecting to its default URL."); } future.complete(Optional.of(result)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java index 917bd1a3128..0571fa6915e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java @@ -442,6 +442,67 @@ public class LookupResultTest { assertTrue(query.contains("other=1")); } + @Test + public void testToLookupRedirectUriInjectsListenerNameWhenResolved() { + // Topic-lookup case: the original request may carry the listener name in a header that the + // redirect does not preserve. toLookupRedirectUri must inject it as a query parameter so the + // next broker sees it. + LookupResult result = LookupResult.builder() + .type(LookupResult.Type.RedirectUrl) + .httpUrl(WEB_URL) + .brokerServiceListenerName("external") + .build(); + + URI request = URI.create("http://original-host:1234/lookup/v2/topic/persistent/p/d/t"); + + URI redirect = result.toLookupRedirectUri(request); + + String query = redirect.getQuery(); + assertNotNull(query); + assertTrue(query.contains("listenerName=external"), "query=" + query); + } + + @Test + public void testToRedirectUriDoesNotInjectListenerNameForAdminPaths() { + // Admin redirects do not understand the listenerName query parameter — even if the + // LookupResult has a resolved brokerServiceListenerName, toRedirectUri must leave the + // request's query string alone. + LookupResult result = LookupResult.builder() + .type(LookupResult.Type.RedirectUrl) + .httpUrl(WEB_URL) + .brokerServiceListenerName("external") + .build(); + + URI request = URI.create("http://original-host:1234/admin/v2/persistent/p/d/t"); + + URI redirect = result.toRedirectUri(request); + + String query = redirect.getQuery(); + // no listenerName query param added + if (query != null) { + assertFalse(query.contains("listenerName"), "query=" + query); + } + } + + @Test + public void testToRedirectUriPreservesExistingListenerNameForAdminPaths() { + // If the original admin request already carries listenerName in the query string, leave it + // there rather than dropping it. + LookupResult result = LookupResult.builder() + .type(LookupResult.Type.RedirectUrl) + .httpUrl(WEB_URL) + .brokerServiceListenerName("external") + .build(); + + URI request = URI.create("http://original-host:1234/admin?listenerName=fromClient"); + + URI redirect = result.toRedirectUri(request); + + String query = redirect.getQuery(); + assertNotNull(query); + assertTrue(query.contains("listenerName=fromClient"), "query=" + query); + } + @Test public void testToStringIncludesAllFields() { LookupResult result = LookupResult.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceListenerResolutionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceListenerResolutionTest.java new file mode 100644 index 00000000000..708a6c0cadf --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceListenerResolutionTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.namespace; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.lookup.LookupResult; +import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +import org.testng.annotations.Test; + +/** + * Unit tests for {@link NamespaceService#resolveBrokerServiceLookupResult}. + */ +@Test(groups = "broker") +public class NamespaceServiceListenerResolutionTest { + + @Test + public void testFailsOnBrokerServiceListenerMismatch() { + NamespaceEphemeralData nsData = new NamespaceEphemeralData("broker-1:8080", + "pulsar://broker-1:6650", null, "http://broker-1:8080", null, false, + new HashMap<>()); + LookupOptions options = LookupOptions.builder().advertisedListenerName("missing").build(); + CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>(); + NamespaceService.resolveBrokerServiceLookupResult(options, nsData, future); + assertTrue(future.isCompletedExceptionally()); + Throwable cause = expectThrows(Exception.class, future::get).getCause(); + assertTrue(cause instanceof PulsarServerException, "unexpected cause: " + cause); + assertTrue(cause.getMessage().contains("'missing' listener"), "unexpected message: " + cause.getMessage()); + } + + @Test + public void testFallsBackOnWebServiceListenerMismatch() throws Exception { + // During rolling upgrades, the target broker may not publish the listener yet. The lookup + // must not fail; instead, the redirect target falls back to the broker's default URLs. + NamespaceEphemeralData nsData = new NamespaceEphemeralData("broker-1:8080", + "pulsar://broker-1:6650", null, "http://broker-1:8080", null, false, + new HashMap<>()); + LookupOptions options = LookupOptions.builder().webServiceAdvertisedListenerName("missing").build(); + CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>(); + NamespaceService.resolveBrokerServiceLookupResult(options, nsData, future); + LookupResult result = future.get().orElseThrow(); + assertNull(result.getWebServiceListenerName()); + assertEquals(result.getLookupData().getHttpUrl(), "http://broker-1:8080"); + } + + @Test + public void testSucceedsWhenListenersMatch() throws Exception { + Map<String, AdvertisedListener> advertisedListeners = new HashMap<>(); + advertisedListeners.put("listener-a", AdvertisedListener.builder() + .brokerServiceUrl(new URI("pulsar://gateway:6650")) + .brokerHttpUrl(new URI("http://gateway:8080")) + .build()); + NamespaceEphemeralData nsData = new NamespaceEphemeralData("broker-1:8080", + "pulsar://broker-1:6650", null, "http://broker-1:8080", null, false, + advertisedListeners); + LookupOptions options = LookupOptions.builder() + .advertisedListenerName("listener-a") + .webServiceAdvertisedListenerName("listener-a") + .build(); + CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>(); + NamespaceService.resolveBrokerServiceLookupResult(options, nsData, future); + LookupResult result = future.get().orElseThrow(); + assertEquals(result.getBrokerServiceListenerName(), "listener-a"); + assertEquals(result.getWebServiceListenerName(), "listener-a"); + } +}
