This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f3577a735 [improve][broker] PIP-307: Add proxy support for Java 
client (#21789)
32f3577a735 is described below

commit 32f3577a735581096d85aa961d7df45b9ae9b6f9
Author: Dragos Misca <[email protected]>
AuthorDate: Fri Dec 22 19:50:40 2023 -0800

    [improve][broker] PIP-307: Add proxy support for Java client (#21789)
---
 .../buffer/impl/TransactionBufferHandlerImpl.java  |   3 +-
 .../pulsar/broker/admin/TopicAutoCreationTest.java |   6 +-
 .../buffer/TransactionBufferClientTest.java        |  13 +-
 .../buffer/TransactionBufferHandlerImplTest.java   |   7 +-
 ...MultiListenersWithInternalListenerNameTest.java |  31 +-
 .../pulsar/client/impl/PulsarTestClient.java       |   2 +-
 .../client/impl/BinaryProtoLookupService.java      |  18 +-
 .../pulsar/client/impl/ConnectionHandler.java      |  21 +-
 .../pulsar/client/impl/HttpLookupService.java      |   6 +-
 .../apache/pulsar/client/impl/LookupService.java   |   6 +-
 .../pulsar/client/impl/LookupTopicResult.java      |  35 +++
 .../pulsar/client/impl/PulsarClientImpl.java       |  27 +-
 .../client/impl/BinaryProtoLookupServiceTest.java  |  18 +-
 .../pulsar/client/impl/ClientTestFixtures.java     |   3 +-
 .../pulsar/client/impl/PulsarClientImplTest.java   |   5 +-
 .../pulsar/client/impl/TopicListWatcherTest.java   |   7 +-
 .../server/ProxyWithExtensibleLoadManagerTest.java | 337 +++++++++++++++++++++
 17 files changed, 468 insertions(+), 77 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 9aac9ab64d0..34ee28693b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
@@ -300,7 +301,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
     }
 
     public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
-        return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
+        return pulsarClient.getConnection(topic, 
randomKeyForSelectConnection).thenApply(Pair::getLeft);
     }
 
     public CompletableFuture<ClientCnx> getClientCnx(String topic) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c9138beee52..9cd1cf214f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -32,7 +32,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -40,6 +39,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.LookupTopicResult;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -135,10 +135,10 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
             ((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
             when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(
                     i -> CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0)));
-            when(mockLookup.getBroker(any())).thenAnswer(i -> {
+            when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
                 InetSocketAddress brokerAddress =
                         new InetSocketAddress(pulsar.getAdvertisedAddress(), 
pulsar.getBrokerListenPort().get());
-                return 
CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
+                return CompletableFuture.completedFuture(new 
LookupTopicResult(brokerAddress, brokerAddress, false));
             });
             final String topicPoliciesServiceInitException
                     = "Topic creation encountered an exception by initialize 
topic policies service";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 1684b2ca138..864b481b72a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -36,6 +36,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
@@ -270,9 +271,10 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
-        
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
-        when(((PulsarClientImpl)mockClient).getConnection(anyString(), 
anyInt())).thenReturn(completableFuture);
-        when(((PulsarClientImpl)mockClient).getConnection(any(), any(), 
anyInt())).thenReturn(completableFuture);
+        
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+        when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
+                CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
+        when(mockClient.getConnection(any(), any(), 
anyInt())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
@@ -324,10 +326,9 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
         ConnectionPool connectionPool = mock(ConnectionPool.class);
         when(mockClient.getCnxPool()).thenReturn(connectionPool);
-        CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
-        completableFuture.complete(clientCnx);
-        when(((PulsarClientImpl)mockClient).getConnection(anyString(), 
anyInt())).thenReturn(completableFuture);
+        when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
+                CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index 278cdbac1f0..633671420e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
@@ -57,9 +58,9 @@ public class TransactionBufferHandlerImplTest {
         
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
         Optional<NamespaceEphemeralData> opData = Optional.empty();
         
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
-        when(((PulsarClientImpl)pulsarClient).getConnection(anyString(), 
anyInt()))
-                
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
-        when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
+        when(pulsarClient.getConnection(anyString(), anyInt()))
+                
.thenReturn(CompletableFuture.completedFuture(Pair.of(mock(ClientCnx.class), 
false)));
+        when(pulsarClient.getConnection(anyString()))
                 
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
         TransactionBufferHandlerImpl handler = spy(new 
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
         doNothing().when(handler).endTxn(any());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index 8365b7a5557..956b834e334 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -140,21 +140,21 @@ public class 
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
         LookupService lookupService = useHttp ? new HttpLookupService(conf, 
eventExecutors) :
                 new BinaryProtoLookupService((PulsarClientImpl) 
this.pulsarClient,
                 lookupUrl.toString(), "internal", false, this.executorService);
+        TopicName topicName = 
TopicName.get("persistent://public/default/test");
+
         // test request 1
         {
-            CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
future =
-                    
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
-            Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, 
TimeUnit.SECONDS);
-            Assert.assertEquals(result.getKey(), brokerAddress);
-            Assert.assertEquals(result.getValue(), brokerAddress);
+            var result = lookupService.getBroker(topicName).get(10, 
TimeUnit.SECONDS);
+            Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
+            Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
+            Assert.assertEquals(result.isUseProxy(), false);
         }
         // test request 2
         {
-            CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
future =
-                    
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
-            Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, 
TimeUnit.SECONDS);
-            Assert.assertEquals(result.getKey(), brokerAddress);
-            Assert.assertEquals(result.getValue(), brokerAddress);
+            var result = lookupService.getBroker(topicName).get(10, 
TimeUnit.SECONDS);
+            Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
+            Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
+            Assert.assertEquals(result.isUseProxy(), false);
         }
     }
 
@@ -187,12 +187,11 @@ public class 
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
         doReturn(CompletableFuture.completedFuture(optional), 
CompletableFuture.completedFuture(optional2))
                 .when(namespaceService).getBrokerServiceUrlAsync(any(), any());
 
-        CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
-                
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
-
-        Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, 
TimeUnit.SECONDS);
-        Assert.assertEquals(result.getKey(), address);
-        Assert.assertEquals(result.getValue(), address);
+        var result =
+                
lookupService.getBroker(TopicName.get("persistent://public/default/test")).get(10,
 TimeUnit.SECONDS);
+        Assert.assertEquals(result.getLogicalAddress(), address);
+        Assert.assertEquals(result.getPhysicalAddress(), address);
+        Assert.assertEquals(result.isUseProxy(), false);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index 8126ba1bba9..ab273913fde 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -125,7 +125,7 @@ public class PulsarTestClient extends PulsarClientImpl {
             result.completeExceptionally(new IOException("New connections are 
rejected."));
             return result;
         } else {
-            return super.getConnection(topic, 
getCnxPool().genRandomKeyToSelectCon());
+            return super.getConnection(topic);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 8ceb8e44975..bdf00844c1c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
@@ -58,7 +57,7 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final String listenerName;
     private final int maxLookupRedirects;
 
-    private final ConcurrentHashMap<TopicName, 
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
+    private final ConcurrentHashMap<TopicName, 
CompletableFuture<LookupTopicResult>>
             lookupInProgress = new ConcurrentHashMap<>();
 
     private final ConcurrentHashMap<TopicName, 
CompletableFuture<PartitionedTopicMetadata>>
@@ -99,11 +98,11 @@ public class BinaryProtoLookupService implements 
LookupService {
      *            topic-name
      * @return broker-socket-address that serves given topic
      */
-    public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
getBroker(TopicName topicName) {
+    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) 
{
         final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
         try {
             return lookupInProgress.computeIfAbsent(topicName, tpName -> {
-                CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
newFuture =
+                CompletableFuture<LookupTopicResult> newFuture =
                         findBroker(serviceNameResolver.resolveHost(), false, 
topicName, 0);
                 newFutureCreated.setValue(newFuture);
                 return newFuture;
@@ -139,9 +138,9 @@ public class BinaryProtoLookupService implements 
LookupService {
         }
     }
 
-    private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
findBroker(InetSocketAddress socketAddress,
+    private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress 
socketAddress,
             boolean authoritative, TopicName topicName, final int 
redirectCount) {
-        CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
addressFuture = new CompletableFuture<>();
+        CompletableFuture<LookupTopicResult> addressFuture = new 
CompletableFuture<>();
 
         if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
             addressFuture.completeExceptionally(
@@ -159,7 +158,6 @@ public class BinaryProtoLookupService implements 
LookupService {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Lookup response exception: {}", 
topicName, t);
                     }
-
                     addressFuture.completeExceptionally(t);
                 } else {
                     URI uri = null;
@@ -198,10 +196,12 @@ public class BinaryProtoLookupService implements 
LookupService {
                             // (3) received correct broker to connect
                             if (r.proxyThroughServiceUrl) {
                                 // Connect through proxy
-                                
addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
+                                addressFuture.complete(
+                                        new 
LookupTopicResult(responseBrokerAddress, socketAddress, true));
                             } else {
                                 // Normal result with direct connection to 
broker
-                                
addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
+                                addressFuture.complete(
+                                        new 
LookupTopicResult(responseBrokerAddress, responseBrokerAddress, false));
                             }
                         }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 178046864c9..7700596dca3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -47,6 +47,8 @@ public class ConnectionHandler {
     private final AtomicBoolean duringConnect = new AtomicBoolean(false);
     protected final int randomKeyForSelectConnection;
 
+    private volatile Boolean useProxy;
+
     interface Connection {
 
         /**
@@ -93,11 +95,14 @@ public class ConnectionHandler {
 
         try {
             CompletableFuture<ClientCnx> cnxFuture;
-            if (hostURI.isPresent()) {
-                InetSocketAddress address = InetSocketAddress.createUnresolved(
-                        hostURI.get().getHost(),
-                        hostURI.get().getPort());
-                cnxFuture = state.client.getConnection(address, address, 
randomKeyForSelectConnection);
+            if (hostURI.isPresent() && useProxy != null) {
+                URI uri = hostURI.get();
+                InetSocketAddress address = 
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
+                if (useProxy) {
+                    cnxFuture = state.client.getProxyConnection(address, 
randomKeyForSelectConnection);
+                } else {
+                    cnxFuture = state.client.getConnection(address, address, 
randomKeyForSelectConnection);
+                }
             } else if (state.redirectedClusterURI != null) {
                 if (state.topic == null) {
                     InetSocketAddress address = 
InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
@@ -112,7 +117,11 @@ public class ConnectionHandler {
             } else if (state.topic == null) {
                 cnxFuture = state.client.getConnectionToServiceUrl();
             } else {
-                cnxFuture = state.client.getConnection(state.topic, 
randomKeyForSelectConnection);
+                cnxFuture = state.client.getConnection(state.topic, 
randomKeyForSelectConnection).thenApply(
+                        connectionResult -> {
+                            useProxy = connectionResult.getRight();
+                            return connectionResult.getLeft();
+                        });
             }
             cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
                     .thenAccept(__ -> duringConnect.set(false))
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index e33efabcc9e..02d0d10626f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -81,7 +80,7 @@ public class HttpLookupService implements LookupService {
      */
     @Override
     @SuppressWarnings("deprecation")
-    public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
getBroker(TopicName topicName) {
+    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) 
{
         String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
         String path = basePath + topicName.getLookupName();
         path = StringUtils.isBlank(listenerName) ? path : path + 
"?listenerName=" + Codec.encode(listenerName);
@@ -101,7 +100,8 @@ public class HttpLookupService implements LookupService {
                 }
 
                 InetSocketAddress brokerAddress = 
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
-                return 
CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
+                return CompletableFuture.completedFuture(new 
LookupTopicResult(brokerAddress, brokerAddress,
+                        false /* HTTP lookups never use the proxy */));
             } catch (Exception e) {
                 // Failed to parse url
                 log.warn("[{}] Lookup Failed due to invalid url {}, {}", 
topicName, uri, e.getMessage());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index f0142f3612b..4d59d6591db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
 import java.net.InetSocketAddress;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.lookup.GetTopicsResult;
@@ -54,9 +53,10 @@ public interface LookupService extends AutoCloseable {
      *
      * @param topicName
      *            topic-name
-     * @return a pair of addresses, representing the logical and physical 
address of the broker that serves given topic
+     * @return a {@link LookupTopicResult} representing the logical and 
physical address of the broker that serves the
+     *         given topic, as well as proxying information.
      */
-    CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
getBroker(TopicName topicName);
+    CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);
 
     /**
      * Returns {@link PartitionedTopicMetadata} for a given topic.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupTopicResult.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupTopicResult.java
new file mode 100644
index 00000000000..9730b5c1da5
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupTopicResult.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@ToString
+public class LookupTopicResult {
+    private final InetSocketAddress logicalAddress;
+    private final InetSocketAddress physicalAddress;
+    private final boolean isUseProxy;
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 50a3dbfc935..179996f4ea9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Builder;
 import lombok.Getter;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -946,10 +947,12 @@ public class PulsarClientImpl implements PulsarClient {
         conf.setTlsTrustStorePassword(tlsTrustStorePassword);
     }
 
-    public CompletableFuture<ClientCnx> getConnection(final String topic, int 
randomKeyForSelectConnection) {
-        TopicName topicName = TopicName.get(topic);
-        return lookup.getBroker(topicName)
-                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight(), randomKeyForSelectConnection));
+    public CompletableFuture<Pair<ClientCnx, Boolean>> getConnection(String 
topic, int randomKeyForSelectConnection) {
+        CompletableFuture<LookupTopicResult> lookupTopicResult = 
lookup.getBroker(TopicName.get(topic));
+        CompletableFuture<Boolean> isUseProxy = 
lookupTopicResult.thenApply(LookupTopicResult::isUseProxy);
+        return lookupTopicResult.thenCompose(lookupResult -> 
getConnection(lookupResult.getLogicalAddress(),
+                        lookupResult.getPhysicalAddress(), 
randomKeyForSelectConnection)).
+                thenCombine(isUseProxy, Pair::of);
     }
 
     /**
@@ -957,15 +960,14 @@ public class PulsarClientImpl implements PulsarClient {
      */
     @VisibleForTesting
     public CompletableFuture<ClientCnx> getConnection(final String topic) {
-        TopicName topicName = TopicName.get(topic);
-        return lookup.getBroker(topicName)
-                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
+        return getConnection(topic, 
cnxPool.genRandomKeyToSelectCon()).thenApply(Pair::getLeft);
     }
 
     public CompletableFuture<ClientCnx> getConnection(final String topic, 
final String url) {
         TopicName topicName = TopicName.get(topic);
         return getLookup(url).getBroker(topicName)
-                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
+                .thenCompose(lookupResult -> 
getConnection(lookupResult.getLogicalAddress(),
+                        lookupResult.getPhysicalAddress(), 
cnxPool.genRandomKeyToSelectCon()));
     }
 
     public LookupService getLookup(String serviceUrl) {
@@ -988,6 +990,15 @@ public class PulsarClientImpl implements PulsarClient {
         return getConnection(address, address, 
cnxPool.genRandomKeyToSelectCon());
     }
 
+    public CompletableFuture<ClientCnx> getProxyConnection(final 
InetSocketAddress logicalAddress,
+                                                           final int 
randomKeyForSelectConnection) {
+        if (!(lookup instanceof BinaryProtoLookupService)) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.InvalidServiceURL(
+                    "Cannot proxy connection through HTTP service URL", null));
+        }
+        return getConnection(logicalAddress, lookup.resolveHost(), 
randomKeyForSelectConnection);
+    }
+
     public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
logicalAddress,
                                                       final InetSocketAddress 
physicalAddress,
                                                       final int 
randomKeyForSelectConnection) {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 0254cf8d44c..87188255b20 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -27,16 +27,12 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import io.netty.buffer.ByteBuf;
-
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException.LookupException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -80,11 +76,12 @@ public class BinaryProtoLookupServiceTest {
 
     @Test(invocationTimeOut = 3000)
     public void maxLookupRedirectsTest1() throws Exception {
-        Pair<InetSocketAddress, InetSocketAddress> addressPair = 
lookup.getBroker(topicName).get();
-        assertEquals(addressPair.getLeft(), InetSocketAddress
+        LookupTopicResult lookupResult = lookup.getBroker(topicName).get();
+        assertEquals(lookupResult.getLogicalAddress(), InetSocketAddress
                 .createUnresolved("broker2.pulsar.apache.org" ,6650));
-        assertEquals(addressPair.getRight(), InetSocketAddress
+        assertEquals(lookupResult.getPhysicalAddress(), InetSocketAddress
                 .createUnresolved("broker2.pulsar.apache.org" ,6650));
+        assertEquals(lookupResult.isUseProxy(), false);
     }
 
     @Test(invocationTimeOut = 3000)
@@ -93,11 +90,12 @@ public class BinaryProtoLookupServiceTest {
         field.setAccessible(true);
         field.set(lookup, 2);
 
-        Pair<InetSocketAddress, InetSocketAddress> addressPair = 
lookup.getBroker(topicName).get();
-        assertEquals(addressPair.getLeft(), InetSocketAddress
+        LookupTopicResult lookupResult = lookup.getBroker(topicName).get();
+        assertEquals(lookupResult.getLogicalAddress(), InetSocketAddress
                 .createUnresolved("broker2.pulsar.apache.org" ,6650));
-        assertEquals(addressPair.getRight(), InetSocketAddress
+        assertEquals(lookupResult.getPhysicalAddress(), InetSocketAddress
                 .createUnresolved("broker2.pulsar.apache.org" ,6650));
+        assertEquals(lookupResult.isUseProxy(), false);
     }
 
     @Test(invocationTimeOut = 3000)
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 738d969ac74..915c3dcc05a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.mockito.Mockito;
@@ -82,7 +83,7 @@ class ClientTestFixtures {
         
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         
when(clientMock.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         when(clientMock.getConnection(anyString(), anyInt()))
-                .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+                
.thenReturn(CompletableFuture.completedFuture(Pair.of(clientCnxMock, false)));
         when(clientMock.getConnection(any(), any(), anyInt()))
                 .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 9a4cfce0cc3..c96443c1e2f 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -50,7 +50,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.regex.Pattern;
 import lombok.Cleanup;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -110,8 +109,8 @@ public class PulsarClientImplTest {
         when(lookup.getPartitionedTopicMetadata(any(TopicName.class)))
                 .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata()));
         when(lookup.getBroker(any()))
-                .thenReturn(CompletableFuture.completedFuture(
-                        Pair.of(mock(InetSocketAddress.class), 
mock(InetSocketAddress.class))));
+                .thenReturn(CompletableFuture.completedFuture(new 
LookupTopicResult(
+                        mock(InetSocketAddress.class), 
mock(InetSocketAddress.class), false)));
         ConnectionPool pool = mock(ConnectionPool.class);
         ClientCnx cnx = mock(ClientCnx.class);
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index 1b39448fbe7..dd75770b568 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -63,8 +64,8 @@ public class TopicListWatcherTest {
         Timer timer = new HashedWheelTimer();
         when(client.timer()).thenReturn(timer);
         String topic = "persistent://tenant/ns/topic\\d+";
-        when(client.getConnection(topic)).thenReturn(clientCnxFuture);
-        when(client.getConnection(topic, 0)).thenReturn(clientCnxFuture);
+        when(client.getConnection(topic, 0)).
+                thenReturn(clientCnxFuture.thenApply(clientCnx -> 
Pair.of(clientCnx, false)));
         when(client.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
         when(connectionPool.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
         watcherFuture = new CompletableFuture<>();
@@ -120,6 +121,4 @@ public class TopicListWatcherTest {
         watcher.handleCommandWatchTopicUpdate(update);
         
verify(listener).onTopicsAdded(Collections.singletonList("persistent://tenant/ns/topic12"));
     }
-
-
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
new file mode 100644
index 00000000000..3a787a8b359
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ServiceNameResolver;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.jetbrains.annotations.NotNull;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest {
+
+    private static final int TEST_TIMEOUT_MS = 30_000;
+
+    private ProxyService proxyService;
+
+    @Override
+    public int numberOfAdditionalBrokers() {
+        return 1;
+    }
+
+    @Override
+    public void doInitConf() throws Exception {
+        super.doInitConf();
+        configureExtensibleLoadManager(conf);
+    }
+
+    @Override
+    protected ServiceConfiguration createConfForAdditionalBroker(int 
additionalBrokerIndex) {
+        return configureExtensibleLoadManager(getDefaultConf());
+    }
+
+    private ServiceConfiguration 
configureExtensibleLoadManager(ServiceConfiguration config) {
+        config.setNumIOThreads(8);
+        config.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 
1000);
+        config.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1);
+        
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        config.setLoadBalancerSheddingEnabled(false);
+        return config;
+    }
+
+    private ProxyConfiguration initializeProxyConfig() {
+        var proxyConfig = new ProxyConfiguration();
+        proxyConfig.setNumIOThreads(8);
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
+        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        return proxyConfig;
+    }
+
+    private <T> T spyField(Object target, String fieldName) throws 
IllegalAccessException {
+        T t = (T) FieldUtils.readDeclaredField(target, fieldName, true);
+        var fieldSpy = spy(t);
+        FieldUtils.writeDeclaredField(target, fieldName, fieldSpy, true);
+        return fieldSpy;
+    }
+
+    private PulsarClientImpl createClient(ProxyService proxyService) {
+        try {
+            return Mockito.spy((PulsarClientImpl) PulsarClient.builder().
+                    serviceUrl(proxyService.getServiceUrl()).
+                    build());
+        } catch (PulsarClientException e) {
+            throw new CompletionException(e);
+        }
+    }
+
+    @NotNull
+    private InetSocketAddress getSourceBrokerInetAddress(TopicName topicName) 
throws PulsarAdminException {
+        var srcBrokerUrl = admin.lookups().lookupTopic(topicName.toString());
+        var serviceUri = ServiceURI.create(srcBrokerUrl);
+        var uri = serviceUri.getUri();
+        return InetSocketAddress.createUnresolved(uri.getHost(), 
uri.getPort());
+    }
+
+    private String getDstBrokerLookupUrl(TopicName topicName) throws Exception 
{
+        var srcBrokerUrl = admin.lookups().lookupTopic(topicName.toString());
+        return getAllBrokers().stream().
+                filter(pulsarService -> !Objects.equals(srcBrokerUrl, 
pulsarService.getBrokerServiceUrl())).
+                map(PulsarService::getLookupServiceAddress).
+                findAny().orElseThrow(() -> new Exception("Could not determine 
destination broker lookup URL"));
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void proxySetup() throws Exception {
+        var proxyConfig = initializeProxyConfig();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
+        proxyService.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void proxyCleanup() throws Exception {
+        if (proxyService != null) {
+            proxyService.close();
+        }
+    }
+
+    @Test(timeOut = TEST_TIMEOUT_MS)
+    public void testProxyProduceConsume() throws Exception {
+        var namespaceName = NamespaceName.get("public", "default");
+        var topicName = TopicName.get(TopicDomain.persistent.toString(), 
namespaceName,
+                BrokerTestUtil.newUniqueName("testProxyProduceConsume"));
+
+        @Cleanup("shutdownNow")
+        var threadPool = Executors.newCachedThreadPool();
+
+        var producerClientFuture = CompletableFuture.supplyAsync(() -> 
createClient(proxyService), threadPool);
+        var consumerClientFuture = CompletableFuture.supplyAsync(() -> 
createClient(proxyService), threadPool);
+
+        @Cleanup
+        var producerClient = producerClientFuture.get();
+        @Cleanup
+        var producer = 
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).create();
+        LookupService producerLookupServiceSpy = spyField(producerClient, 
"lookup");
+
+        @Cleanup
+        var consumerClient = consumerClientFuture.get();
+        @Cleanup
+        var consumer = 
consumerClient.newConsumer(Schema.INT32).topic(topicName.toString()).
+                
subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).
+                subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
+                ackTimeout(1000, TimeUnit.MILLISECONDS).
+                subscribe();
+        LookupService consumerLookupServiceSpy = spyField(consumerClient, 
"lookup");
+
+        var bundleRange = admin.lookups().getBundleRange(topicName.toString());
+
+        var semSend = new Semaphore(0);
+        var messagesBeforeUnload = 100;
+        var messagesAfterUnload = 100;
+
+        var pendingMessageIds = Collections.synchronizedSet(new 
HashSet<Integer>());
+        var producerFuture = CompletableFuture.runAsync(() -> {
+            try {
+                for (int i = 0; i < messagesBeforeUnload + 
messagesAfterUnload; i++) {
+                    semSend.acquire();
+                    pendingMessageIds.add(i);
+                    producer.send(i);
+                }
+            } catch (Exception e) {
+                throw new CompletionException(e);
+            }
+        }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+        var consumerFuture = CompletableFuture.runAsync(() -> {
+            while (!producerFuture.isDone() || !pendingMessageIds.isEmpty()) {
+                try {
+                    var recvMessage = consumer.receive(1_500, 
TimeUnit.MILLISECONDS);
+                    if (recvMessage != null) {
+                        consumer.acknowledge(recvMessage);
+                        pendingMessageIds.remove(recvMessage.getValue());
+                    }
+                } catch (PulsarClientException e) {
+                    // Retry
+                }
+            }
+        }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+        var dstBrokerLookupUrl = getDstBrokerLookupUrl(topicName);
+        semSend.release(messagesBeforeUnload);
+        admin.namespaces().unloadNamespaceBundle(namespaceName.toString(), 
bundleRange, dstBrokerLookupUrl);
+        semSend.release(messagesAfterUnload);
+
+        // Verify all futures completed successfully.
+        producerFuture.get();
+        consumerFuture.get();
+
+        verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
+        verify(producerLookupServiceSpy, never()).getBroker(topicName);
+
+        verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
+        verify(consumerLookupServiceSpy, never()).getBroker(topicName);
+    }
+
+    @Test(timeOut = TEST_TIMEOUT_MS)
+    public void testClientReconnectsToBrokerOnProxyClosing() throws Exception {
+        var namespaceName = NamespaceName.get("public", "default");
+        var topicName = TopicName.get(TopicDomain.persistent.toString(), 
namespaceName,
+                
BrokerTestUtil.newUniqueName("testClientReconnectsToBrokerOnProxyClosing"));
+
+        @Cleanup("shutdownNow")
+        var threadPool = Executors.newCachedThreadPool();
+
+        var producerClientFuture = CompletableFuture.supplyAsync(() -> 
createClient(proxyService), threadPool);
+        var consumerClientFuture = CompletableFuture.supplyAsync(() -> 
createClient(proxyService), threadPool);
+
+        @Cleanup
+        var producerClient = producerClientFuture.get();
+        @Cleanup
+        var producer = (ProducerImpl<Integer>) 
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).
+                create();
+        LookupService producerLookupServiceSpy = spyField(producerClient, 
"lookup");
+        when(((ServiceNameResolver) spyField(producerLookupServiceSpy, 
"serviceNameResolver")).resolveHost()).
+                thenCallRealMethod().then(invocation -> 
getSourceBrokerInetAddress(topicName));
+
+        @Cleanup
+        var consumerClient = consumerClientFuture.get();
+        @Cleanup
+        var consumer = (ConsumerImpl<Integer>) 
consumerClient.newConsumer(Schema.INT32).topic(topicName.toString()).
+                
subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).
+                subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
+                ackTimeout(1000, TimeUnit.MILLISECONDS).
+                subscribe();
+        LookupService consumerLookupServiceSpy = spyField(consumerClient, 
"lookup");
+        when(((ServiceNameResolver) spyField(consumerLookupServiceSpy, 
"serviceNameResolver")).resolveHost()).
+                thenCallRealMethod().then(invocation -> 
getSourceBrokerInetAddress(topicName));
+
+        var bundleRange = admin.lookups().getBundleRange(topicName.toString());
+
+        var semSend = new Semaphore(0);
+        var messagesPerPhase = 100;
+        var phases = 4;
+        var totalMessages = messagesPerPhase * phases;
+        var cdlSentMessages = new CountDownLatch(messagesPerPhase * 2);
+
+        var pendingMessageIds = Collections.synchronizedSet(new 
HashSet<Integer>());
+        var producerFuture = CompletableFuture.runAsync(() -> {
+            try {
+                for (int i = 0; i < totalMessages; i++) {
+                    semSend.acquire();
+                    pendingMessageIds.add(i);
+                    producer.send(i);
+                    cdlSentMessages.countDown();
+                }
+            } catch (Exception e) {
+                throw new CompletionException(e);
+            }
+        }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+        var consumerFuture = CompletableFuture.runAsync(() -> {
+            while (!producerFuture.isDone() || !pendingMessageIds.isEmpty()) {
+                try {
+                    var recvMessage = consumer.receive(1_500, 
TimeUnit.MILLISECONDS);
+                    if (recvMessage != null) {
+                        consumer.acknowledge(recvMessage);
+                        pendingMessageIds.remove(recvMessage.getValue());
+                    }
+                } catch (PulsarClientException e) {
+                    // Retry
+                }
+            }
+        }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+        var dstBrokerLookupUrl = getDstBrokerLookupUrl(topicName);
+        semSend.release(messagesPerPhase);
+        admin.namespaces().unloadNamespaceBundle(namespaceName.toString(), 
bundleRange, dstBrokerLookupUrl);
+        semSend.release(messagesPerPhase);
+
+        cdlSentMessages.await();
+        
assertEquals(FieldUtils.readDeclaredField(producer.getConnectionHandler(), 
"useProxy", true), Boolean.TRUE);
+        
assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(), 
"useProxy", true), Boolean.TRUE);
+        semSend.release(messagesPerPhase);
+        proxyService.close();
+        proxyService = null;
+        semSend.release(messagesPerPhase);
+
+        // Verify produce/consume futures completed successfully.
+        producerFuture.get();
+        consumerFuture.get();
+
+        
assertEquals(FieldUtils.readDeclaredField(producer.getConnectionHandler(), 
"useProxy", true), Boolean.FALSE);
+        
assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(), 
"useProxy", true), Boolean.FALSE);
+
+        verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
+        verify(producerLookupServiceSpy, times(1)).getBroker(topicName);
+
+        verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
+        verify(consumerLookupServiceSpy, times(1)).getBroker(topicName);
+    }
+}

Reply via email to