This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef27cd241fe [improve][proxy] Support scalable topics through the proxy
(#25879)
ef27cd241fe is described below
commit ef27cd241fed53c9defaf57f9c2e7135d3eeef6a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 13:06:13 2026 -0700
[improve][proxy] Support scalable topics through the proxy (#25879)
---
.../pulsar/client/impl/v5/DagWatchClient.java | 7 ++
.../client/impl/v5/ScalableConsumerClient.java | 7 ++
.../client/impl/v5/ScalableTopicsWatcher.java | 4 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 19 +++++
.../client/impl/PulsarChannelInitializer.java | 14 +++-
.../pulsar/client/impl/PulsarClientImpl.java | 14 ++++
pulsar-proxy/build.gradle.kts | 2 +
.../pulsar/proxy/server/ProxyConnection.java | 58 ++++++++++++++-
.../proxy/server/ProxyScalableTopicsTest.java | 85 ++++++++++++++++++++++
9 files changed, 204 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index 7fc31f2ad7a..40510c2e1ae 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -65,6 +65,7 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
private volatile LayoutChangeListener listener;
private volatile ClientCnx cnx;
private volatile boolean closed = false;
+ private volatile boolean usingProxy = false;
/** Canonical topic://t/n/x identity returned by the broker. Resolved on
the first
* update; used as the parent topic when computing segment:// URIs for
real DAGs. */
private volatile TopicName resolvedTopicName;
@@ -117,6 +118,7 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
}
return;
}
+ this.usingProxy = newCnx.isProxied();
this.cnx = newCnx;
newCnx.registerDagWatchSession(sessionId, this);
newCnx.ctx().writeAndFlush(
@@ -239,6 +241,11 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
});
}
+ /** Whether the DAG-watch connection was established through a proxy. */
+ boolean isUsingProxy() {
+ return usingProxy;
+ }
+
ClientSegmentLayout currentLayout() {
return currentLayout.get();
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index ad9e49ecb15..fa0fc37c658 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -140,6 +140,13 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
DagWatchClient watch = new DagWatchClient(v4Client, topicName);
watch.start()
.thenCompose(layout -> {
+ if (watch.isUsingProxy()) {
+ // Behind a proxy the controller's advertised address
isn't reachable
+ // directly. Pair to any broker through the proxy;
that broker forwards the
+ // subscribe to the controller and relays assignment
updates back.
+ log.debug().log("Connecting through proxy to any
broker for subscribe");
+ return v4Client.getAnyBrokerProxyConnection();
+ }
String controllerUrl = layout.controllerBrokerUrl();
if (controllerUrl == null || controllerUrl.isEmpty()) {
// Controller leader election hasn't completed yet (or
the broker
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
index 0350d83df52..3ece8fe0628 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
@@ -116,7 +116,7 @@ final class ScalableTopicsWatcher implements
ScalableTopicsWatcherSession, AutoC
* subsequent {@code Snapshot} / {@code Diff} flows through {@link
#setListener}.
*/
CompletableFuture<List<String>> start() {
- v4Client.getConnectionToServiceUrl()
+ v4Client.getAnyBrokerProxyConnection()
.thenAccept(this::attach)
.exceptionally(ex -> {
initialSnapshotFuture.completeExceptionally(ex);
@@ -264,7 +264,7 @@ final class ScalableTopicsWatcher implements
ScalableTopicsWatcherSession, AutoC
if (closed) {
return;
}
- v4Client.getConnectionToServiceUrl()
+ v4Client.getAnyBrokerProxyConnection()
.thenAccept(newCnx -> {
if (closed) {
return;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 97430e72a22..ff12f01b40f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -211,6 +211,11 @@ public class ClientCnx extends PulsarHandler {
protected final int protocolVersion;
private final long operationTimeoutMs;
+ // Value set as proxyToTargetBrokerAddress to ask the proxy to pair this
connection to any broker
+ // it selects. It must be present-but-empty on the wire: a null
proxyToBrokerUrl would be omitted
+ // entirely, leaving the proxy in plain lookup mode instead of pairing the
connection.
+ static final String PROXY_TO_ANY_BROKER_URL = "";
+
protected String proxyToTargetBrokerAddress = null;
// Remote hostName with which client is connected
protected String remoteHostName = null;
@@ -1585,6 +1590,20 @@ public class ClientCnx extends PulsarHandler {
targetBrokerAddress.getPort());
}
+ void setProxyToAnyBroker() {
+ this.proxyToTargetBrokerAddress = PROXY_TO_ANY_BROKER_URL;
+ }
+
+ /**
+ * Whether this connection goes through a proxy. True for both a
specific-broker proxy connection
+ * (proxyToTargetBrokerAddress is a {@code host:port}) and an any-broker
pairing
+ * ({@link #PROXY_TO_ANY_BROKER_URL}, the empty string); only a direct,
non-proxied connection
+ * leaves it null. Hence the null check rather than a comparison against
the any-broker sentinel.
+ */
+ public boolean isProxied() {
+ return proxyToTargetBrokerAddress != null;
+ }
+
void setRemoteHostName(String remoteHostName) {
this.remoteHostName = remoteHostName;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index fdd672b587b..fc7daec082b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -176,6 +176,14 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
return initSocks5Future;
}
+ /**
+ * Sentinel logical address marking a connection that the proxy should
pair to any broker it
+ * selects (the client sends an empty proxyToBrokerUrl). It is never
resolved or dialed — only
+ * the physical address (the proxy) is — and is matched by identity.
+ */
+ static final InetSocketAddress PROXY_TO_ANY_BROKER =
+
InetSocketAddress.createUnresolved("proxy-to-any-broker.pulsar.invalid", 0);
+
CompletableFuture<Channel> initializeClientCnx(Channel ch,
InetSocketAddress
logicalAddress,
InetSocketAddress
unresolvedPhysicalAddress) {
@@ -186,7 +194,11 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
throw new IllegalStateException("Missing ClientCnx. This
should not happen.");
}
- if (!logicalAddress.equals(unresolvedPhysicalAddress)) {
+ if (logicalAddress == PROXY_TO_ANY_BROKER) {
+ // Pair through the proxy to any broker: send an empty
proxyToBrokerUrl so the proxy
+ // selects a broker and bridges this connection to it.
+ cnx.setProxyToAnyBroker();
+ } else if (!logicalAddress.equals(unresolvedPhysicalAddress)) {
// We are connecting through a proxy. We need to set the
target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
cnx.setTargetBroker(logicalAddress);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a8c7b46b3db..610cfb78c6c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1281,6 +1281,20 @@ public class PulsarClientImpl implements PulsarClient {
return getConnection(address, address,
cnxPool.genRandomKeyToSelectCon());
}
+ /**
+ * Open a connection to the proxy and ask it to pair the connection to any
broker it selects
+ * (an empty proxyToBrokerUrl). Used for control-plane operations that
aren't tied to a specific
+ * broker (e.g. scalable-topic subscribe/namespace-watch) when connecting
through a proxy.
+ */
+ public CompletableFuture<ClientCnx> getAnyBrokerProxyConnection() {
+ if (!lookup.isBinaryProtoLookupService()) {
+ return FutureUtil.failedFuture(new
PulsarClientException.InvalidServiceURL(
+ "Can't pair to any broker through an HTTP service URL",
null));
+ }
+ return getConnection(PulsarChannelInitializer.PROXY_TO_ANY_BROKER,
lookup.resolveHost(),
+ cnxPool.genRandomKeyToSelectCon());
+ }
+
public CompletableFuture<ClientCnx> getProxyConnection(final
InetSocketAddress logicalAddress,
final int
randomKeyForSelectConnection) {
if (!lookup.isBinaryProtoLookupService()) {
diff --git a/pulsar-proxy/build.gradle.kts b/pulsar-proxy/build.gradle.kts
index 19173ff99e5..82393b2e486 100644
--- a/pulsar-proxy/build.gradle.kts
+++ b/pulsar-proxy/build.gradle.kts
@@ -68,6 +68,8 @@ dependencies {
testImplementation(project(":pulsar-broker"))
testImplementation(project(path = ":pulsar-broker", configuration =
"testJar"))
testImplementation(project(":pulsar-client-admin-original"))
+ testImplementation(project(":pulsar-client-v5"))
+ testImplementation(project(":pulsar-client-api-v5"))
testImplementation(project(":testmocks"))
testImplementation(libs.asynchttpclient)
testImplementation(libs.avro)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index bfdfb83f8cc..53bd36f704d 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -32,6 +32,8 @@ import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.HashSet;
@@ -377,10 +379,22 @@ public class ProxyConnection extends PulsarHandler {
.attr("hasProxyToBrokerUrl", hasProxyToBrokerUrl)
.log("complete connection, init proxy handler. authenticated
with role");
if (hasProxyToBrokerUrl) {
- // Optimize proxy connection to fail-fast if the target broker
isn't active
- // Pulsar client will retry connecting after a back off timeout
- if (service.getConfiguration().isCheckActiveBrokers()
+ if (proxyToBrokerUrl.isBlank()) {
+ // An empty proxyToBrokerUrl is the "pair me to any broker"
sentinel: the
+ // client (e.g. a scalable-topic control connection) doesn't
target a specific
+ // broker, so the proxy selects one and bridges the connection
to it.
+ String anyBroker = selectAnyBrokerHostAndPort();
+ if (anyBroker == null) {
+ state = State.Closing;
+ writeAndFlushAndClose(Commands.newError(-1,
+ ServerError.ServiceNotReady, "No broker available
to proxy the connection."));
+ return;
+ }
+ proxyToBrokerUrl = anyBroker;
+ } else if (service.getConfiguration().isCheckActiveBrokers()
&& !isBrokerActive(proxyToBrokerUrl)) {
+ // Optimize proxy connection to fail-fast if the target broker
isn't active
+ // Pulsar client will retry connecting after a back off timeout
state = State.Closing;
log.warn()
.attr("remoteAddress", remoteAddress)
@@ -459,6 +473,44 @@ public class ProxyConnection extends PulsarHandler {
}
}
+ /**
+ * Select a broker for an "any broker" proxy pairing (empty
proxyToBrokerUrl). Returns the
+ * broker as {@code host:port} (the format {@link BrokerProxyValidator}
expects), or
+ * {@code null} if no broker is available.
+ */
+ private String selectAnyBrokerHostAndPort() {
+ boolean tls = service.getConfiguration().isTlsEnabledWithBroker();
+ String brokerUrl = tls
+ ? service.getConfiguration().getBrokerServiceURLTLS()
+ : service.getConfiguration().getBrokerServiceURL();
+ if (brokerUrl == null || brokerUrl.isBlank()) {
+ try {
+ ServiceLookupData broker =
service.getDiscoveryProvider().nextBroker();
+ brokerUrl = tls ? broker.getPulsarServiceUrlTls() :
broker.getPulsarServiceUrl();
+ } catch (Exception e) {
+ log.warn()
+ .attr("remoteAddress", remoteAddress)
+ .exception(e)
+ .log("Failed to select a broker for any-broker
proxying");
+ return null;
+ }
+ }
+ if (brokerUrl == null || brokerUrl.isBlank()) {
+ return null;
+ }
+ try {
+ URI uri = new URI(brokerUrl);
+ if (uri.getHost() == null || uri.getPort() < 0) {
+ log.warn().attr("brokerUrl", brokerUrl).log("Broker URL is
missing host or port");
+ return null;
+ }
+ return uri.getHost() + ":" + uri.getPort();
+ } catch (URISyntaxException e) {
+ log.warn().attr("brokerUrl", brokerUrl).exception(e).log("Invalid
broker URL");
+ return null;
+ }
+ }
+
private void handleBrokerConnected(DirectProxyHandler directProxyHandler,
CommandConnected connected) {
assert ctx.executor().inEventLoop();
if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen()
&& this.directProxyHandler == null) {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyScalableTopicsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyScalableTopicsTest.java
new file mode 100644
index 00000000000..001a2b201b1
--- /dev/null
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyScalableTopicsTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end verification that scalable topics ({@code topic://}) work
through the Pulsar proxy
+ * with the V5 client.
+ *
+ * <p>Exercises every control- and data-plane path that has to traverse the
proxy:
+ * <ul>
+ * <li>the DAG-watch lookup and per-segment producer/consumer connections
ride the proxy's
+ * transparent (direct) mode via ordinary {@code topic://} / {@code
segment://} lookups;</li>
+ * <li>the consumer's controller subscribe isn't tied to a specific broker,
so it's sent over an
+ * any-broker connection that the proxy pairs to a broker it
selects.</li>
+ * </ul>
+ */
+public class ProxyScalableTopicsTest extends ProxyMultiBrokerBaseTest {
+
+ @Test
+ public void testProduceAndConsumeScalableTopicThroughProxy() throws
Exception {
+ // Two segments so the topic can span more than one broker behind the
proxy.
+ String topic = "topic://public/default/" +
BrokerTestUtil.newUniqueName("scalable-proxy");
+ admin.scalableTopics().createScalableTopic(topic, 2);
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getProxyServiceUrl())
+ .build();
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ QueueConsumer<String> consumer =
client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("proxy-sub")
+ .subscribe();
+
+ int numMessages = 20;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value("msg-" + i).send();
+ }
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+ assertNotNull(msg, "consumer must receive a message within the
timeout through the proxy");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received.size(), numMessages, "every produced message
must be received through the proxy");
+ }
+}