This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef27cd241fe [improve][proxy] Support scalable topics through the proxy 
(#25879)
ef27cd241fe is described below

commit ef27cd241fed53c9defaf57f9c2e7135d3eeef6a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 13:06:13 2026 -0700

    [improve][proxy] Support scalable topics through the proxy (#25879)
---
 .../pulsar/client/impl/v5/DagWatchClient.java      |  7 ++
 .../client/impl/v5/ScalableConsumerClient.java     |  7 ++
 .../client/impl/v5/ScalableTopicsWatcher.java      |  4 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 19 +++++
 .../client/impl/PulsarChannelInitializer.java      | 14 +++-
 .../pulsar/client/impl/PulsarClientImpl.java       | 14 ++++
 pulsar-proxy/build.gradle.kts                      |  2 +
 .../pulsar/proxy/server/ProxyConnection.java       | 58 ++++++++++++++-
 .../proxy/server/ProxyScalableTopicsTest.java      | 85 ++++++++++++++++++++++
 9 files changed, 204 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index 7fc31f2ad7a..40510c2e1ae 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -65,6 +65,7 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
     private volatile LayoutChangeListener listener;
     private volatile ClientCnx cnx;
     private volatile boolean closed = false;
+    private volatile boolean usingProxy = false;
     /** Canonical topic://t/n/x identity returned by the broker. Resolved on 
the first
      *  update; used as the parent topic when computing segment:// URIs for 
real DAGs. */
     private volatile TopicName resolvedTopicName;
@@ -117,6 +118,7 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
             }
             return;
         }
+        this.usingProxy = newCnx.isProxied();
         this.cnx = newCnx;
         newCnx.registerDagWatchSession(sessionId, this);
         newCnx.ctx().writeAndFlush(
@@ -239,6 +241,11 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
                 });
     }
 
+    /** Whether the DAG-watch connection was established through a proxy. */
+    boolean isUsingProxy() {
+        return usingProxy;
+    }
+
     ClientSegmentLayout currentLayout() {
         return currentLayout.get();
     }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index ad9e49ecb15..fa0fc37c658 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -140,6 +140,13 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
         DagWatchClient watch = new DagWatchClient(v4Client, topicName);
         watch.start()
                 .thenCompose(layout -> {
+                    if (watch.isUsingProxy()) {
+                        // Behind a proxy the controller's advertised address 
isn't reachable
+                        // directly. Pair to any broker through the proxy; 
that broker forwards the
+                        // subscribe to the controller and relays assignment 
updates back.
+                        log.debug().log("Connecting through proxy to any 
broker for subscribe");
+                        return v4Client.getAnyBrokerProxyConnection();
+                    }
                     String controllerUrl = layout.controllerBrokerUrl();
                     if (controllerUrl == null || controllerUrl.isEmpty()) {
                         // Controller leader election hasn't completed yet (or 
the broker
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
index 0350d83df52..3ece8fe0628 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
@@ -116,7 +116,7 @@ final class ScalableTopicsWatcher implements 
ScalableTopicsWatcherSession, AutoC
      * subsequent {@code Snapshot} / {@code Diff} flows through {@link 
#setListener}.
      */
     CompletableFuture<List<String>> start() {
-        v4Client.getConnectionToServiceUrl()
+        v4Client.getAnyBrokerProxyConnection()
                 .thenAccept(this::attach)
                 .exceptionally(ex -> {
                     initialSnapshotFuture.completeExceptionally(ex);
@@ -264,7 +264,7 @@ final class ScalableTopicsWatcher implements 
ScalableTopicsWatcherSession, AutoC
         if (closed) {
             return;
         }
-        v4Client.getConnectionToServiceUrl()
+        v4Client.getAnyBrokerProxyConnection()
                 .thenAccept(newCnx -> {
                     if (closed) {
                         return;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 97430e72a22..ff12f01b40f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -211,6 +211,11 @@ public class ClientCnx extends PulsarHandler {
     protected final int protocolVersion;
     private final long operationTimeoutMs;
 
+    // Value set as proxyToTargetBrokerAddress to ask the proxy to pair this 
connection to any broker
+    // it selects. It must be present-but-empty on the wire: a null 
proxyToBrokerUrl would be omitted
+    // entirely, leaving the proxy in plain lookup mode instead of pairing the 
connection.
+    static final String PROXY_TO_ANY_BROKER_URL = "";
+
     protected String proxyToTargetBrokerAddress = null;
     // Remote hostName with which client is connected
     protected String remoteHostName = null;
@@ -1585,6 +1590,20 @@ public class ClientCnx extends PulsarHandler {
                 targetBrokerAddress.getPort());
     }
 
+    void setProxyToAnyBroker() {
+        this.proxyToTargetBrokerAddress = PROXY_TO_ANY_BROKER_URL;
+    }
+
+    /**
+     * Whether this connection goes through a proxy. True for both a 
specific-broker proxy connection
+     * (proxyToTargetBrokerAddress is a {@code host:port}) and an any-broker 
pairing
+     * ({@link #PROXY_TO_ANY_BROKER_URL}, the empty string); only a direct, 
non-proxied connection
+     * leaves it null. Hence the null check rather than a comparison against 
the any-broker sentinel.
+     */
+    public boolean isProxied() {
+        return proxyToTargetBrokerAddress != null;
+    }
+
      void setRemoteHostName(String remoteHostName) {
         this.remoteHostName = remoteHostName;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index fdd672b587b..fc7daec082b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -176,6 +176,14 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         return initSocks5Future;
     }
 
+    /**
+     * Sentinel logical address marking a connection that the proxy should 
pair to any broker it
+     * selects (the client sends an empty proxyToBrokerUrl). It is never 
resolved or dialed — only
+     * the physical address (the proxy) is — and is matched by identity.
+     */
+    static final InetSocketAddress PROXY_TO_ANY_BROKER =
+            
InetSocketAddress.createUnresolved("proxy-to-any-broker.pulsar.invalid", 0);
+
     CompletableFuture<Channel> initializeClientCnx(Channel ch,
                                                    InetSocketAddress 
logicalAddress,
                                                    InetSocketAddress 
unresolvedPhysicalAddress) {
@@ -186,7 +194,11 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
                 throw new IllegalStateException("Missing ClientCnx. This 
should not happen.");
             }
 
-            if (!logicalAddress.equals(unresolvedPhysicalAddress)) {
+            if (logicalAddress == PROXY_TO_ANY_BROKER) {
+                // Pair through the proxy to any broker: send an empty 
proxyToBrokerUrl so the proxy
+                // selects a broker and bridges this connection to it.
+                cnx.setProxyToAnyBroker();
+            } else if (!logicalAddress.equals(unresolvedPhysicalAddress)) {
                 // We are connecting through a proxy. We need to set the 
target broker in the ClientCnx object so that
                 // it can be specified when sending the CommandConnect.
                 cnx.setTargetBroker(logicalAddress);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a8c7b46b3db..610cfb78c6c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1281,6 +1281,20 @@ public class PulsarClientImpl implements PulsarClient {
         return getConnection(address, address, 
cnxPool.genRandomKeyToSelectCon());
     }
 
+    /**
+     * Open a connection to the proxy and ask it to pair the connection to any 
broker it selects
+     * (an empty proxyToBrokerUrl). Used for control-plane operations that 
aren't tied to a specific
+     * broker (e.g. scalable-topic subscribe/namespace-watch) when connecting 
through a proxy.
+     */
+    public CompletableFuture<ClientCnx> getAnyBrokerProxyConnection() {
+        if (!lookup.isBinaryProtoLookupService()) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.InvalidServiceURL(
+                    "Can't pair to any broker through an HTTP service URL", 
null));
+        }
+        return getConnection(PulsarChannelInitializer.PROXY_TO_ANY_BROKER, 
lookup.resolveHost(),
+                cnxPool.genRandomKeyToSelectCon());
+    }
+
     public CompletableFuture<ClientCnx> getProxyConnection(final 
InetSocketAddress logicalAddress,
                                                            final int 
randomKeyForSelectConnection) {
         if (!lookup.isBinaryProtoLookupService()) {
diff --git a/pulsar-proxy/build.gradle.kts b/pulsar-proxy/build.gradle.kts
index 19173ff99e5..82393b2e486 100644
--- a/pulsar-proxy/build.gradle.kts
+++ b/pulsar-proxy/build.gradle.kts
@@ -68,6 +68,8 @@ dependencies {
     testImplementation(project(":pulsar-broker"))
     testImplementation(project(path = ":pulsar-broker", configuration = 
"testJar"))
     testImplementation(project(":pulsar-client-admin-original"))
+    testImplementation(project(":pulsar-client-v5"))
+    testImplementation(project(":pulsar-client-api-v5"))
     testImplementation(project(":testmocks"))
     testImplementation(libs.asynchttpclient)
     testImplementation(libs.avro)
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index bfdfb83f8cc..53bd36f704d 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -32,6 +32,8 @@ import io.netty.resolver.dns.DnsAddressResolverGroup;
 import io.netty.util.concurrent.ScheduledFuture;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.channels.ClosedChannelException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -377,10 +379,22 @@ public class ProxyConnection extends PulsarHandler {
                 .attr("hasProxyToBrokerUrl", hasProxyToBrokerUrl)
                 .log("complete connection, init proxy handler. authenticated 
with role");
         if (hasProxyToBrokerUrl) {
-            // Optimize proxy connection to fail-fast if the target broker 
isn't active
-            // Pulsar client will retry connecting after a back off timeout
-            if (service.getConfiguration().isCheckActiveBrokers()
+            if (proxyToBrokerUrl.isBlank()) {
+                // An empty proxyToBrokerUrl is the "pair me to any broker" 
sentinel: the
+                // client (e.g. a scalable-topic control connection) doesn't 
target a specific
+                // broker, so the proxy selects one and bridges the connection 
to it.
+                String anyBroker = selectAnyBrokerHostAndPort();
+                if (anyBroker == null) {
+                    state = State.Closing;
+                    writeAndFlushAndClose(Commands.newError(-1,
+                            ServerError.ServiceNotReady, "No broker available 
to proxy the connection."));
+                    return;
+                }
+                proxyToBrokerUrl = anyBroker;
+            } else if (service.getConfiguration().isCheckActiveBrokers()
                     && !isBrokerActive(proxyToBrokerUrl)) {
+                // Optimize proxy connection to fail-fast if the target broker 
isn't active
+                // Pulsar client will retry connecting after a back off timeout
                 state = State.Closing;
                 log.warn()
                         .attr("remoteAddress", remoteAddress)
@@ -459,6 +473,44 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    /**
+     * Select a broker for an "any broker" proxy pairing (empty 
proxyToBrokerUrl). Returns the
+     * broker as {@code host:port} (the format {@link BrokerProxyValidator} 
expects), or
+     * {@code null} if no broker is available.
+     */
+    private String selectAnyBrokerHostAndPort() {
+        boolean tls = service.getConfiguration().isTlsEnabledWithBroker();
+        String brokerUrl = tls
+                ? service.getConfiguration().getBrokerServiceURLTLS()
+                : service.getConfiguration().getBrokerServiceURL();
+        if (brokerUrl == null || brokerUrl.isBlank()) {
+            try {
+                ServiceLookupData broker = 
service.getDiscoveryProvider().nextBroker();
+                brokerUrl = tls ? broker.getPulsarServiceUrlTls() : 
broker.getPulsarServiceUrl();
+            } catch (Exception e) {
+                log.warn()
+                        .attr("remoteAddress", remoteAddress)
+                        .exception(e)
+                        .log("Failed to select a broker for any-broker 
proxying");
+                return null;
+            }
+        }
+        if (brokerUrl == null || brokerUrl.isBlank()) {
+            return null;
+        }
+        try {
+            URI uri = new URI(brokerUrl);
+            if (uri.getHost() == null || uri.getPort() < 0) {
+                log.warn().attr("brokerUrl", brokerUrl).log("Broker URL is 
missing host or port");
+                return null;
+            }
+            return uri.getHost() + ":" + uri.getPort();
+        } catch (URISyntaxException e) {
+            log.warn().attr("brokerUrl", brokerUrl).exception(e).log("Invalid 
broker URL");
+            return null;
+        }
+    }
+
     private void handleBrokerConnected(DirectProxyHandler directProxyHandler, 
CommandConnected connected) {
         assert ctx.executor().inEventLoop();
         if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() 
&& this.directProxyHandler == null) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyScalableTopicsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyScalableTopicsTest.java
new file mode 100644
index 00000000000..001a2b201b1
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyScalableTopicsTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end verification that scalable topics ({@code topic://}) work 
through the Pulsar proxy
+ * with the V5 client.
+ *
+ * <p>Exercises every control- and data-plane path that has to traverse the 
proxy:
+ * <ul>
+ *   <li>the DAG-watch lookup and per-segment producer/consumer connections 
ride the proxy's
+ *       transparent (direct) mode via ordinary {@code topic://} / {@code 
segment://} lookups;</li>
+ *   <li>the consumer's controller subscribe isn't tied to a specific broker, 
so it's sent over an
+ *       any-broker connection that the proxy pairs to a broker it 
selects.</li>
+ * </ul>
+ */
+public class ProxyScalableTopicsTest extends ProxyMultiBrokerBaseTest {
+
+    @Test
+    public void testProduceAndConsumeScalableTopicThroughProxy() throws 
Exception {
+        // Two segments so the topic can span more than one broker behind the 
proxy.
+        String topic = "topic://public/default/" + 
BrokerTestUtil.newUniqueName("scalable-proxy");
+        admin.scalableTopics().createScalableTopic(topic, 2);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(getProxyServiceUrl())
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("proxy-sub")
+                .subscribe();
+
+        int numMessages = 20;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value("msg-" + i).send();
+        }
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+            assertNotNull(msg, "consumer must receive a message within the 
timeout through the proxy");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received.size(), numMessages, "every produced message 
must be received through the proxy");
+    }
+}

Reply via email to