This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 32f3577a735 [improve][broker] PIP-307: Add proxy support for Java
client (#21789)
32f3577a735 is described below
commit 32f3577a735581096d85aa961d7df45b9ae9b6f9
Author: Dragos Misca <[email protected]>
AuthorDate: Fri Dec 22 19:50:40 2023 -0800
[improve][broker] PIP-307: Add proxy support for Java client (#21789)
---
.../buffer/impl/TransactionBufferHandlerImpl.java | 3 +-
.../pulsar/broker/admin/TopicAutoCreationTest.java | 6 +-
.../buffer/TransactionBufferClientTest.java | 13 +-
.../buffer/TransactionBufferHandlerImplTest.java | 7 +-
...MultiListenersWithInternalListenerNameTest.java | 31 +-
.../pulsar/client/impl/PulsarTestClient.java | 2 +-
.../client/impl/BinaryProtoLookupService.java | 18 +-
.../pulsar/client/impl/ConnectionHandler.java | 21 +-
.../pulsar/client/impl/HttpLookupService.java | 6 +-
.../apache/pulsar/client/impl/LookupService.java | 6 +-
.../pulsar/client/impl/LookupTopicResult.java | 35 +++
.../pulsar/client/impl/PulsarClientImpl.java | 27 +-
.../client/impl/BinaryProtoLookupServiceTest.java | 18 +-
.../pulsar/client/impl/ClientTestFixtures.java | 3 +-
.../pulsar/client/impl/PulsarClientImplTest.java | 5 +-
.../pulsar/client/impl/TopicListWatcherTest.java | 7 +-
.../server/ProxyWithExtensibleLoadManagerTest.java | 337 +++++++++++++++++++++
17 files changed, 468 insertions(+), 77 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 9aac9ab64d0..34ee28693b4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
@@ -300,7 +301,7 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
}
public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
- return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
+ return pulsarClient.getConnection(topic,
randomKeyForSelectConnection).thenApply(Pair::getLeft);
}
public CompletableFuture<ClientCnx> getClientCnx(String topic) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c9138beee52..9cd1cf214f6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -32,7 +32,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
@@ -40,6 +39,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -135,10 +135,10 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(
i -> CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0)));
- when(mockLookup.getBroker(any())).thenAnswer(i -> {
+ when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(),
pulsar.getBrokerListenPort().get());
- return
CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
+ return CompletableFuture.completedFuture(new
LookupTopicResult(brokerAddress, brokerAddress, false));
});
final String topicPoliciesServiceInitException
= "Topic creation encountered an exception by initialize
topic policies service";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 1684b2ca138..864b481b72a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -36,6 +36,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
@@ -270,9 +271,10 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
-
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
- when(((PulsarClientImpl)mockClient).getConnection(anyString(),
anyInt())).thenReturn(completableFuture);
- when(((PulsarClientImpl)mockClient).getConnection(any(), any(),
anyInt())).thenReturn(completableFuture);
+
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+ when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
+ CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
+ when(mockClient.getConnection(any(), any(),
anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
@@ -324,10 +326,9 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
- CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
- completableFuture.complete(clientCnx);
- when(((PulsarClientImpl)mockClient).getConnection(anyString(),
anyInt())).thenReturn(completableFuture);
+ when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
+ CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index 278cdbac1f0..633671420e5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
@@ -57,9 +58,9 @@ public class TransactionBufferHandlerImplTest {
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
Optional<NamespaceEphemeralData> opData = Optional.empty();
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
- when(((PulsarClientImpl)pulsarClient).getConnection(anyString(),
anyInt()))
-
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
- when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
+ when(pulsarClient.getConnection(anyString(), anyInt()))
+
.thenReturn(CompletableFuture.completedFuture(Pair.of(mock(ClientCnx.class),
false)));
+ when(pulsarClient.getConnection(anyString()))
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index 8365b7a5557..956b834e334 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -140,21 +140,21 @@ public class
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
LookupService lookupService = useHttp ? new HttpLookupService(conf,
eventExecutors) :
new BinaryProtoLookupService((PulsarClientImpl)
this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
+ TopicName topicName =
TopicName.get("persistent://public/default/test");
+
// test request 1
{
- CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
future =
-
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
- Pair<InetSocketAddress, InetSocketAddress> result = future.get(10,
TimeUnit.SECONDS);
- Assert.assertEquals(result.getKey(), brokerAddress);
- Assert.assertEquals(result.getValue(), brokerAddress);
+ var result = lookupService.getBroker(topicName).get(10,
TimeUnit.SECONDS);
+ Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
+ Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
+ Assert.assertEquals(result.isUseProxy(), false);
}
// test request 2
{
- CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
future =
-
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
- Pair<InetSocketAddress, InetSocketAddress> result = future.get(10,
TimeUnit.SECONDS);
- Assert.assertEquals(result.getKey(), brokerAddress);
- Assert.assertEquals(result.getValue(), brokerAddress);
+ var result = lookupService.getBroker(topicName).get(10,
TimeUnit.SECONDS);
+ Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
+ Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
+ Assert.assertEquals(result.isUseProxy(), false);
}
}
@@ -187,12 +187,11 @@ public class
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
doReturn(CompletableFuture.completedFuture(optional),
CompletableFuture.completedFuture(optional2))
.when(namespaceService).getBrokerServiceUrlAsync(any(), any());
- CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
-
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
-
- Pair<InetSocketAddress, InetSocketAddress> result = future.get(10,
TimeUnit.SECONDS);
- Assert.assertEquals(result.getKey(), address);
- Assert.assertEquals(result.getValue(), address);
+ var result =
+
lookupService.getBroker(TopicName.get("persistent://public/default/test")).get(10,
TimeUnit.SECONDS);
+ Assert.assertEquals(result.getLogicalAddress(), address);
+ Assert.assertEquals(result.getPhysicalAddress(), address);
+ Assert.assertEquals(result.isUseProxy(), false);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index 8126ba1bba9..ab273913fde 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -125,7 +125,7 @@ public class PulsarTestClient extends PulsarClientImpl {
result.completeExceptionally(new IOException("New connections are
rejected."));
return result;
} else {
- return super.getConnection(topic,
getCnxPool().genRandomKeyToSelectCon());
+ return super.getConnection(topic);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 8ceb8e44975..bdf00844c1c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
@@ -58,7 +57,7 @@ public class BinaryProtoLookupService implements
LookupService {
private final String listenerName;
private final int maxLookupRedirects;
- private final ConcurrentHashMap<TopicName,
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
+ private final ConcurrentHashMap<TopicName,
CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();
private final ConcurrentHashMap<TopicName,
CompletableFuture<PartitionedTopicMetadata>>
@@ -99,11 +98,11 @@ public class BinaryProtoLookupService implements
LookupService {
* topic-name
* @return broker-socket-address that serves given topic
*/
- public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
getBroker(TopicName topicName) {
+ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName)
{
final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
try {
return lookupInProgress.computeIfAbsent(topicName, tpName -> {
- CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
newFuture =
+ CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false,
topicName, 0);
newFutureCreated.setValue(newFuture);
return newFuture;
@@ -139,9 +138,9 @@ public class BinaryProtoLookupService implements
LookupService {
}
}
- private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
findBroker(InetSocketAddress socketAddress,
+ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress
socketAddress,
boolean authoritative, TopicName topicName, final int
redirectCount) {
- CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
addressFuture = new CompletableFuture<>();
+ CompletableFuture<LookupTopicResult> addressFuture = new
CompletableFuture<>();
if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
addressFuture.completeExceptionally(
@@ -159,7 +158,6 @@ public class BinaryProtoLookupService implements
LookupService {
if (log.isDebugEnabled()) {
log.debug("[{}] Lookup response exception: {}",
topicName, t);
}
-
addressFuture.completeExceptionally(t);
} else {
URI uri = null;
@@ -198,10 +196,12 @@ public class BinaryProtoLookupService implements
LookupService {
// (3) received correct broker to connect
if (r.proxyThroughServiceUrl) {
// Connect through proxy
-
addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
+ addressFuture.complete(
+ new
LookupTopicResult(responseBrokerAddress, socketAddress, true));
} else {
// Normal result with direct connection to
broker
-
addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
+ addressFuture.complete(
+ new
LookupTopicResult(responseBrokerAddress, responseBrokerAddress, false));
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 178046864c9..7700596dca3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -47,6 +47,8 @@ public class ConnectionHandler {
private final AtomicBoolean duringConnect = new AtomicBoolean(false);
protected final int randomKeyForSelectConnection;
+ private volatile Boolean useProxy;
+
interface Connection {
/**
@@ -93,11 +95,14 @@ public class ConnectionHandler {
try {
CompletableFuture<ClientCnx> cnxFuture;
- if (hostURI.isPresent()) {
- InetSocketAddress address = InetSocketAddress.createUnresolved(
- hostURI.get().getHost(),
- hostURI.get().getPort());
- cnxFuture = state.client.getConnection(address, address,
randomKeyForSelectConnection);
+ if (hostURI.isPresent() && useProxy != null) {
+ URI uri = hostURI.get();
+ InetSocketAddress address =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
+ if (useProxy) {
+ cnxFuture = state.client.getProxyConnection(address,
randomKeyForSelectConnection);
+ } else {
+ cnxFuture = state.client.getConnection(address, address,
randomKeyForSelectConnection);
+ }
} else if (state.redirectedClusterURI != null) {
if (state.topic == null) {
InetSocketAddress address =
InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
@@ -112,7 +117,11 @@ public class ConnectionHandler {
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
- cnxFuture = state.client.getConnection(state.topic,
randomKeyForSelectConnection);
+ cnxFuture = state.client.getConnection(state.topic,
randomKeyForSelectConnection).thenApply(
+ connectionResult -> {
+ useProxy = connectionResult.getRight();
+ return connectionResult.getLeft();
+ });
}
cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
.thenAccept(__ -> duringConnect.set(false))
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index e33efabcc9e..02d0d10626f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -81,7 +80,7 @@ public class HttpLookupService implements LookupService {
*/
@Override
@SuppressWarnings("deprecation")
- public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
getBroker(TopicName topicName) {
+ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName)
{
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path +
"?listenerName=" + Codec.encode(listenerName);
@@ -101,7 +100,8 @@ public class HttpLookupService implements LookupService {
}
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
- return
CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
+ return CompletableFuture.completedFuture(new
LookupTopicResult(brokerAddress, brokerAddress,
+ false /* HTTP lookups never use the proxy */));
} catch (Exception e) {
// Failed to parse url
log.warn("[{}] Lookup Failed due to invalid url {}, {}",
topicName, uri, e.getMessage());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index f0142f3612b..4d59d6591db 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
@@ -54,9 +53,10 @@ public interface LookupService extends AutoCloseable {
*
* @param topicName
* topic-name
- * @return a pair of addresses, representing the logical and physical
address of the broker that serves given topic
+ * @return a {@link LookupTopicResult} representing the logical and
physical address of the broker that serves the
+ * given topic, as well as proxying information.
*/
- CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
getBroker(TopicName topicName);
+ CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);
/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupTopicResult.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupTopicResult.java
new file mode 100644
index 00000000000..9730b5c1da5
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupTopicResult.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@ToString
+public class LookupTopicResult {
+ private final InetSocketAddress logicalAddress;
+ private final InetSocketAddress physicalAddress;
+ private final boolean isUseProxy;
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 50a3dbfc935..179996f4ea9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Builder;
import lombok.Getter;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -946,10 +947,12 @@ public class PulsarClientImpl implements PulsarClient {
conf.setTlsTrustStorePassword(tlsTrustStorePassword);
}
- public CompletableFuture<ClientCnx> getConnection(final String topic, int
randomKeyForSelectConnection) {
- TopicName topicName = TopicName.get(topic);
- return lookup.getBroker(topicName)
- .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight(), randomKeyForSelectConnection));
+ public CompletableFuture<Pair<ClientCnx, Boolean>> getConnection(String
topic, int randomKeyForSelectConnection) {
+ CompletableFuture<LookupTopicResult> lookupTopicResult =
lookup.getBroker(TopicName.get(topic));
+ CompletableFuture<Boolean> isUseProxy =
lookupTopicResult.thenApply(LookupTopicResult::isUseProxy);
+ return lookupTopicResult.thenCompose(lookupResult ->
getConnection(lookupResult.getLogicalAddress(),
+ lookupResult.getPhysicalAddress(),
randomKeyForSelectConnection)).
+ thenCombine(isUseProxy, Pair::of);
}
/**
@@ -957,15 +960,14 @@ public class PulsarClientImpl implements PulsarClient {
*/
@VisibleForTesting
public CompletableFuture<ClientCnx> getConnection(final String topic) {
- TopicName topicName = TopicName.get(topic);
- return lookup.getBroker(topicName)
- .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
+ return getConnection(topic,
cnxPool.genRandomKeyToSelectCon()).thenApply(Pair::getLeft);
}
public CompletableFuture<ClientCnx> getConnection(final String topic,
final String url) {
TopicName topicName = TopicName.get(topic);
return getLookup(url).getBroker(topicName)
- .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
+ .thenCompose(lookupResult ->
getConnection(lookupResult.getLogicalAddress(),
+ lookupResult.getPhysicalAddress(),
cnxPool.genRandomKeyToSelectCon()));
}
public LookupService getLookup(String serviceUrl) {
@@ -988,6 +990,15 @@ public class PulsarClientImpl implements PulsarClient {
return getConnection(address, address,
cnxPool.genRandomKeyToSelectCon());
}
+ public CompletableFuture<ClientCnx> getProxyConnection(final
InetSocketAddress logicalAddress,
+ final int
randomKeyForSelectConnection) {
+ if (!(lookup instanceof BinaryProtoLookupService)) {
+ return FutureUtil.failedFuture(new
PulsarClientException.InvalidServiceURL(
+ "Cannot proxy connection through HTTP service URL", null));
+ }
+ return getConnection(logicalAddress, lookup.resolveHost(),
randomKeyForSelectConnection);
+ }
+
public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress
logicalAddress,
final InetSocketAddress
physicalAddress,
final int
randomKeyForSelectConnection) {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 0254cf8d44c..87188255b20 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -27,16 +27,12 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import io.netty.buffer.ByteBuf;
-
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -80,11 +76,12 @@ public class BinaryProtoLookupServiceTest {
@Test(invocationTimeOut = 3000)
public void maxLookupRedirectsTest1() throws Exception {
- Pair<InetSocketAddress, InetSocketAddress> addressPair =
lookup.getBroker(topicName).get();
- assertEquals(addressPair.getLeft(), InetSocketAddress
+ LookupTopicResult lookupResult = lookup.getBroker(topicName).get();
+ assertEquals(lookupResult.getLogicalAddress(), InetSocketAddress
.createUnresolved("broker2.pulsar.apache.org" ,6650));
- assertEquals(addressPair.getRight(), InetSocketAddress
+ assertEquals(lookupResult.getPhysicalAddress(), InetSocketAddress
.createUnresolved("broker2.pulsar.apache.org" ,6650));
+ assertEquals(lookupResult.isUseProxy(), false);
}
@Test(invocationTimeOut = 3000)
@@ -93,11 +90,12 @@ public class BinaryProtoLookupServiceTest {
field.setAccessible(true);
field.set(lookup, 2);
- Pair<InetSocketAddress, InetSocketAddress> addressPair =
lookup.getBroker(topicName).get();
- assertEquals(addressPair.getLeft(), InetSocketAddress
+ LookupTopicResult lookupResult = lookup.getBroker(topicName).get();
+ assertEquals(lookupResult.getLogicalAddress(), InetSocketAddress
.createUnresolved("broker2.pulsar.apache.org" ,6650));
- assertEquals(addressPair.getRight(), InetSocketAddress
+ assertEquals(lookupResult.getPhysicalAddress(), InetSocketAddress
.createUnresolved("broker2.pulsar.apache.org" ,6650));
+ assertEquals(lookupResult.isUseProxy(), false);
}
@Test(invocationTimeOut = 3000)
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 738d969ac74..915c3dcc05a 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.mockito.Mockito;
@@ -82,7 +83,7 @@ class ClientTestFixtures {
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
when(clientMock.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
when(clientMock.getConnection(anyString(), anyInt()))
- .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+
.thenReturn(CompletableFuture.completedFuture(Pair.of(clientCnxMock, false)));
when(clientMock.getConnection(any(), any(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(clientCnxMock));
ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 9a4cfce0cc3..c96443c1e2f 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -50,7 +50,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;
import lombok.Cleanup;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -110,8 +109,8 @@ public class PulsarClientImplTest {
when(lookup.getPartitionedTopicMetadata(any(TopicName.class)))
.thenReturn(CompletableFuture.completedFuture(new
PartitionedTopicMetadata()));
when(lookup.getBroker(any()))
- .thenReturn(CompletableFuture.completedFuture(
- Pair.of(mock(InetSocketAddress.class),
mock(InetSocketAddress.class))));
+ .thenReturn(CompletableFuture.completedFuture(new
LookupTopicResult(
+ mock(InetSocketAddress.class),
mock(InetSocketAddress.class), false)));
ConnectionPool pool = mock(ConnectionPool.class);
ClientCnx cnx = mock(ClientCnx.class);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index 1b39448fbe7..dd75770b568 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -63,8 +64,8 @@ public class TopicListWatcherTest {
Timer timer = new HashedWheelTimer();
when(client.timer()).thenReturn(timer);
String topic = "persistent://tenant/ns/topic\\d+";
- when(client.getConnection(topic)).thenReturn(clientCnxFuture);
- when(client.getConnection(topic, 0)).thenReturn(clientCnxFuture);
+ when(client.getConnection(topic, 0)).
+ thenReturn(clientCnxFuture.thenApply(clientCnx ->
Pair.of(clientCnx, false)));
when(client.getConnection(any(), any(),
anyInt())).thenReturn(clientCnxFuture);
when(connectionPool.getConnection(any(), any(),
anyInt())).thenReturn(clientCnxFuture);
watcherFuture = new CompletableFuture<>();
@@ -120,6 +121,4 @@ public class TopicListWatcherTest {
watcher.handleCommandWatchTopicUpdate(update);
verify(listener).onTopicsAdded(Collections.singletonList("persistent://tenant/ns/topic12"));
}
-
-
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
new file mode 100644
index 00000000000..3a787a8b359
--- /dev/null
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ServiceNameResolver;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.jetbrains.annotations.NotNull;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest {
+
+ private static final int TEST_TIMEOUT_MS = 30_000;
+
+ private ProxyService proxyService;
+
+ @Override
+ public int numberOfAdditionalBrokers() {
+ return 1;
+ }
+
+ @Override
+ public void doInitConf() throws Exception {
+ super.doInitConf();
+ configureExtensibleLoadManager(conf);
+ }
+
+ @Override
+ protected ServiceConfiguration createConfForAdditionalBroker(int
additionalBrokerIndex) {
+ return configureExtensibleLoadManager(getDefaultConf());
+ }
+
+ private ServiceConfiguration
configureExtensibleLoadManager(ServiceConfiguration config) {
+ config.setNumIOThreads(8);
+ config.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 *
1000);
+ config.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1);
+
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ config.setLoadBalancerSheddingEnabled(false);
+ return config;
+ }
+
+ private ProxyConfiguration initializeProxyConfig() {
+ var proxyConfig = new ProxyConfiguration();
+ proxyConfig.setNumIOThreads(8);
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
+ proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ return proxyConfig;
+ }
+
+ private <T> T spyField(Object target, String fieldName) throws
IllegalAccessException {
+ T t = (T) FieldUtils.readDeclaredField(target, fieldName, true);
+ var fieldSpy = spy(t);
+ FieldUtils.writeDeclaredField(target, fieldName, fieldSpy, true);
+ return fieldSpy;
+ }
+
+ private PulsarClientImpl createClient(ProxyService proxyService) {
+ try {
+ return Mockito.spy((PulsarClientImpl) PulsarClient.builder().
+ serviceUrl(proxyService.getServiceUrl()).
+ build());
+ } catch (PulsarClientException e) {
+ throw new CompletionException(e);
+ }
+ }
+
+ @NotNull
+ private InetSocketAddress getSourceBrokerInetAddress(TopicName topicName)
throws PulsarAdminException {
+ var srcBrokerUrl = admin.lookups().lookupTopic(topicName.toString());
+ var serviceUri = ServiceURI.create(srcBrokerUrl);
+ var uri = serviceUri.getUri();
+ return InetSocketAddress.createUnresolved(uri.getHost(),
uri.getPort());
+ }
+
+ private String getDstBrokerLookupUrl(TopicName topicName) throws Exception
{
+ var srcBrokerUrl = admin.lookups().lookupTopic(topicName.toString());
+ return getAllBrokers().stream().
+ filter(pulsarService -> !Objects.equals(srcBrokerUrl,
pulsarService.getBrokerServiceUrl())).
+ map(PulsarService::getLookupServiceAddress).
+ findAny().orElseThrow(() -> new Exception("Could not determine
destination broker lookup URL"));
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void proxySetup() throws Exception {
+ var proxyConfig = initializeProxyConfig();
+ proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
+ proxyService.start();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void proxyCleanup() throws Exception {
+ if (proxyService != null) {
+ proxyService.close();
+ }
+ }
+
+ @Test(timeOut = TEST_TIMEOUT_MS)
+ public void testProxyProduceConsume() throws Exception {
+ var namespaceName = NamespaceName.get("public", "default");
+ var topicName = TopicName.get(TopicDomain.persistent.toString(),
namespaceName,
+ BrokerTestUtil.newUniqueName("testProxyProduceConsume"));
+
+ @Cleanup("shutdownNow")
+ var threadPool = Executors.newCachedThreadPool();
+
+ var producerClientFuture = CompletableFuture.supplyAsync(() ->
createClient(proxyService), threadPool);
+ var consumerClientFuture = CompletableFuture.supplyAsync(() ->
createClient(proxyService), threadPool);
+
+ @Cleanup
+ var producerClient = producerClientFuture.get();
+ @Cleanup
+ var producer =
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).create();
+ LookupService producerLookupServiceSpy = spyField(producerClient,
"lookup");
+
+ @Cleanup
+ var consumerClient = consumerClientFuture.get();
+ @Cleanup
+ var consumer =
consumerClient.newConsumer(Schema.INT32).topic(topicName.toString()).
+
subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).
+ subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
+ ackTimeout(1000, TimeUnit.MILLISECONDS).
+ subscribe();
+ LookupService consumerLookupServiceSpy = spyField(consumerClient,
"lookup");
+
+ var bundleRange = admin.lookups().getBundleRange(topicName.toString());
+
+ var semSend = new Semaphore(0);
+ var messagesBeforeUnload = 100;
+ var messagesAfterUnload = 100;
+
+ var pendingMessageIds = Collections.synchronizedSet(new
HashSet<Integer>());
+ var producerFuture = CompletableFuture.runAsync(() -> {
+ try {
+ for (int i = 0; i < messagesBeforeUnload +
messagesAfterUnload; i++) {
+ semSend.acquire();
+ pendingMessageIds.add(i);
+ producer.send(i);
+ }
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ var consumerFuture = CompletableFuture.runAsync(() -> {
+ while (!producerFuture.isDone() || !pendingMessageIds.isEmpty()) {
+ try {
+ var recvMessage = consumer.receive(1_500,
TimeUnit.MILLISECONDS);
+ if (recvMessage != null) {
+ consumer.acknowledge(recvMessage);
+ pendingMessageIds.remove(recvMessage.getValue());
+ }
+ } catch (PulsarClientException e) {
+ // Retry
+ }
+ }
+ }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ var dstBrokerLookupUrl = getDstBrokerLookupUrl(topicName);
+ semSend.release(messagesBeforeUnload);
+ admin.namespaces().unloadNamespaceBundle(namespaceName.toString(),
bundleRange, dstBrokerLookupUrl);
+ semSend.release(messagesAfterUnload);
+
+ // Verify all futures completed successfully.
+ producerFuture.get();
+ consumerFuture.get();
+
+ verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
+ verify(producerLookupServiceSpy, never()).getBroker(topicName);
+
+ verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
+ verify(consumerLookupServiceSpy, never()).getBroker(topicName);
+ }
+
+ @Test(timeOut = TEST_TIMEOUT_MS)
+ public void testClientReconnectsToBrokerOnProxyClosing() throws Exception {
+ var namespaceName = NamespaceName.get("public", "default");
+ var topicName = TopicName.get(TopicDomain.persistent.toString(),
namespaceName,
+
BrokerTestUtil.newUniqueName("testClientReconnectsToBrokerOnProxyClosing"));
+
+ @Cleanup("shutdownNow")
+ var threadPool = Executors.newCachedThreadPool();
+
+ var producerClientFuture = CompletableFuture.supplyAsync(() ->
createClient(proxyService), threadPool);
+ var consumerClientFuture = CompletableFuture.supplyAsync(() ->
createClient(proxyService), threadPool);
+
+ @Cleanup
+ var producerClient = producerClientFuture.get();
+ @Cleanup
+ var producer = (ProducerImpl<Integer>)
producerClient.newProducer(Schema.INT32).topic(topicName.toString()).
+ create();
+ LookupService producerLookupServiceSpy = spyField(producerClient,
"lookup");
+ when(((ServiceNameResolver) spyField(producerLookupServiceSpy,
"serviceNameResolver")).resolveHost()).
+ thenCallRealMethod().then(invocation ->
getSourceBrokerInetAddress(topicName));
+
+ @Cleanup
+ var consumerClient = consumerClientFuture.get();
+ @Cleanup
+ var consumer = (ConsumerImpl<Integer>)
consumerClient.newConsumer(Schema.INT32).topic(topicName.toString()).
+
subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).
+ subscriptionName(BrokerTestUtil.newUniqueName("my-sub")).
+ ackTimeout(1000, TimeUnit.MILLISECONDS).
+ subscribe();
+ LookupService consumerLookupServiceSpy = spyField(consumerClient,
"lookup");
+ when(((ServiceNameResolver) spyField(consumerLookupServiceSpy,
"serviceNameResolver")).resolveHost()).
+ thenCallRealMethod().then(invocation ->
getSourceBrokerInetAddress(topicName));
+
+ var bundleRange = admin.lookups().getBundleRange(topicName.toString());
+
+ var semSend = new Semaphore(0);
+ var messagesPerPhase = 100;
+ var phases = 4;
+ var totalMessages = messagesPerPhase * phases;
+ var cdlSentMessages = new CountDownLatch(messagesPerPhase * 2);
+
+ var pendingMessageIds = Collections.synchronizedSet(new
HashSet<Integer>());
+ var producerFuture = CompletableFuture.runAsync(() -> {
+ try {
+ for (int i = 0; i < totalMessages; i++) {
+ semSend.acquire();
+ pendingMessageIds.add(i);
+ producer.send(i);
+ cdlSentMessages.countDown();
+ }
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ var consumerFuture = CompletableFuture.runAsync(() -> {
+ while (!producerFuture.isDone() || !pendingMessageIds.isEmpty()) {
+ try {
+ var recvMessage = consumer.receive(1_500,
TimeUnit.MILLISECONDS);
+ if (recvMessage != null) {
+ consumer.acknowledge(recvMessage);
+ pendingMessageIds.remove(recvMessage.getValue());
+ }
+ } catch (PulsarClientException e) {
+ // Retry
+ }
+ }
+ }, threadPool).orTimeout(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ var dstBrokerLookupUrl = getDstBrokerLookupUrl(topicName);
+ semSend.release(messagesPerPhase);
+ admin.namespaces().unloadNamespaceBundle(namespaceName.toString(),
bundleRange, dstBrokerLookupUrl);
+ semSend.release(messagesPerPhase);
+
+ cdlSentMessages.await();
+
assertEquals(FieldUtils.readDeclaredField(producer.getConnectionHandler(),
"useProxy", true), Boolean.TRUE);
+
assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(),
"useProxy", true), Boolean.TRUE);
+ semSend.release(messagesPerPhase);
+ proxyService.close();
+ proxyService = null;
+ semSend.release(messagesPerPhase);
+
+ // Verify produce/consume futures completed successfully.
+ producerFuture.get();
+ consumerFuture.get();
+
+
assertEquals(FieldUtils.readDeclaredField(producer.getConnectionHandler(),
"useProxy", true), Boolean.FALSE);
+
assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(),
"useProxy", true), Boolean.FALSE);
+
+ verify(producerClient, times(1)).getProxyConnection(any(), anyInt());
+ verify(producerLookupServiceSpy, times(1)).getBroker(topicName);
+
+ verify(consumerClient, times(1)).getProxyConnection(any(), anyInt());
+ verify(consumerLookupServiceSpy, times(1)).getBroker(topicName);
+ }
+}