This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e2863391e7f [fix][client] Broker address resolution wrong if connect
through a multi-dns names proxy (#19597)
e2863391e7f is described below
commit e2863391e7f6f9b6c5060f0f78378493f8df37f3
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Feb 23 10:12:10 2023 +0100
[fix][client] Broker address resolution wrong if connect through a
multi-dns names proxy (#19597)
---
.../pulsar/client/impl/ConnectionPoolTest.java | 89 +++++++++++++++++++++-
.../apache/pulsar/client/impl/ConnectionPool.java | 24 ++++--
.../client/impl/PulsarChannelInitializer.java | 8 +-
3 files changed, 106 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index e8816894513..fb564bd5083 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -20,12 +20,17 @@ package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.AbstractAddressResolver;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
+import io.netty.util.concurrent.Promise;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -66,7 +71,7 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
List<InetSocketAddress> result = new ArrayList<>();
result.add(new InetSocketAddress("127.0.0.1", brokerPort));
Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
- brokerPort)))
+ brokerPort)))
.thenReturn(CompletableFuture.completedFuture(result));
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
@@ -107,7 +112,7 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
InetSocketAddress brokerAddress =
- InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
+ InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
IntStream.range(1, 5).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
@@ -119,6 +124,7 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
pool.closeAllConnections();
pool.close();
+ eventLoop.shutdownGracefully();
}
@Test
@@ -129,7 +135,7 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
InetSocketAddress brokerAddress =
- InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
+ InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
IntStream.range(1, 10).forEach(i -> {
pool.getConnection(brokerAddress).thenAccept(cnx -> {
Assert.assertTrue(cnx.channel().isActive());
@@ -141,5 +147,82 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
pool.closeAllConnections();
pool.close();
+ eventLoop.shutdownGracefully();
+ }
+
+
+ @Test
+ public void testSetProxyToTargetBrokerAddress() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setConnectionsPerBroker(5);
+
+
+ EventLoopGroup eventLoop =
+ EventLoopUtil.newEventLoopGroup(8, false,
+ new DefaultThreadFactory("test"));
+
+ final AbstractAddressResolver resolver = new
AbstractAddressResolver(eventLoop.next()) {
+ @Override
+ protected boolean doIsResolved(SocketAddress socketAddress) {
+ return !((InetSocketAddress) socketAddress).isUnresolved();
+ }
+
+ @Override
+ protected void doResolve(SocketAddress socketAddress, Promise
promise) throws Exception {
+ promise.setFailure(new IllegalStateException());
+ throw new IllegalStateException();
+ }
+
+ @Override
+ protected void doResolveAll(SocketAddress socketAddress, Promise
promise) throws Exception {
+ final InetSocketAddress socketAddress1 = (InetSocketAddress)
socketAddress;
+ final boolean isProxy =
socketAddress1.getHostName().equals("proxy");
+ final boolean isBroker =
socketAddress1.getHostName().equals("broker");
+ if (!isProxy && !isBroker) {
+ promise.setFailure(new IllegalStateException());
+ throw new IllegalStateException();
+ }
+ List<InetSocketAddress> result = new ArrayList<>();
+ if (isProxy) {
+ result.add(new InetSocketAddress("localhost", brokerPort));
+ result.add(InetSocketAddress.createUnresolved("proxy",
brokerPort));
+ } else {
+ result.add(new InetSocketAddress("127.0.0.1", brokerPort));
+ result.add(InetSocketAddress.createUnresolved("broker",
brokerPort));
+ }
+ promise.setSuccess(result);
+ }
+ };
+
+ ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop,
+ (Supplier<ClientCnx>) () -> new ClientCnx(conf, eventLoop),
Optional.of(resolver));
+
+
+ ClientCnx cnx = pool.getConnection(
+ InetSocketAddress.createUnresolved("proxy", 9999),
+ InetSocketAddress.createUnresolved("proxy", 9999)).get();
+ Assert.assertEquals(cnx.remoteHostName, "proxy");
+ Assert.assertNull(cnx.proxyToTargetBrokerAddress);
+ cnx.close();
+
+ cnx = pool.getConnection(
+ InetSocketAddress.createUnresolved("broker", 9999),
+ InetSocketAddress.createUnresolved("proxy", 9999)).get();
+ Assert.assertEquals(cnx.remoteHostName, "proxy");
+ Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999");
+ cnx.close();
+
+
+ cnx = pool.getConnection(
+ InetSocketAddress.createUnresolved("broker", 9999),
+ InetSocketAddress.createUnresolved("broker", 9999)).get();
+ Assert.assertEquals(cnx.remoteHostName, "broker");
+ Assert.assertNull(cnx.proxyToTargetBrokerAddress);
+ cnx.close();
+
+
+ pool.closeAllConnections();
+ pool.close();
+ eventLoop.shutdownGracefully();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 2e105b53284..3a9a2b9b7ab 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -305,8 +305,12 @@ public class ConnectionPool implements AutoCloseable {
resolvedAddress = resolveName(unresolvedPhysicalAddress);
}
return resolvedAddress.thenCompose(
- inetAddresses ->
connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
- isSniProxy ? unresolvedPhysicalAddress : null));
+ inetAddresses -> connectToResolvedAddresses(
+ logicalAddress,
+ unresolvedPhysicalAddress,
+ inetAddresses.iterator(),
+ isSniProxy ? unresolvedPhysicalAddress : null)
+ );
} catch (URISyntaxException e) {
log.error("Invalid Proxy url {}",
clientConfig.getProxyServiceUrl(), e);
return FutureUtil
@@ -319,17 +323,19 @@ public class ConnectionPool implements AutoCloseable {
* address is working.
*/
private CompletableFuture<Channel>
connectToResolvedAddresses(InetSocketAddress logicalAddress,
+
InetSocketAddress unresolvedPhysicalAddress,
Iterator<InetSocketAddress> resolvedPhysicalAddress,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// Successfully connected to server
- connectToAddress(logicalAddress, resolvedPhysicalAddress.next(),
sniHost)
+ connectToAddress(logicalAddress, resolvedPhysicalAddress.next(),
unresolvedPhysicalAddress, sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (resolvedPhysicalAddress.hasNext()) {
// Try next IP address
- connectToResolvedAddresses(logicalAddress,
resolvedPhysicalAddress, sniHost)
+ connectToResolvedAddresses(logicalAddress,
unresolvedPhysicalAddress,
+ resolvedPhysicalAddress, sniHost)
.thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive
call
@@ -362,20 +368,24 @@ public class ConnectionPool implements AutoCloseable {
* Attempt to establish a TCP connection to an already resolved single IP
address.
*/
private CompletableFuture<Channel> connectToAddress(InetSocketAddress
logicalAddress,
- InetSocketAddress
physicalAddress, InetSocketAddress sniHost) {
+ InetSocketAddress
physicalAddress,
+ InetSocketAddress
unresolvedPhysicalAddress,
+ InetSocketAddress
sniHost) {
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost :
physicalAddress))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(ch ->
- channelInitializerHandler.initializeClientCnx(ch,
logicalAddress, physicalAddress))
+ channelInitializerHandler.initializeClientCnx(ch,
logicalAddress,
+ unresolvedPhysicalAddress))
.thenCompose(channel ->
toCompletableFuture(channel.connect(physicalAddress)));
} else {
return toCompletableFuture(bootstrap.register())
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(ch ->
- channelInitializerHandler.initializeClientCnx(ch,
logicalAddress, physicalAddress))
+ channelInitializerHandler.initializeClientCnx(ch,
logicalAddress,
+ unresolvedPhysicalAddress))
.thenCompose(channel ->
toCompletableFuture(channel.connect(physicalAddress)));
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index e01b53b8ef1..ed34f7d41c1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -213,7 +213,7 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
CompletableFuture<Channel> initializeClientCnx(Channel ch,
InetSocketAddress
logicalAddress,
- InetSocketAddress
resolvedPhysicalAddress) {
+ InetSocketAddress
unresolvedPhysicalAddress) {
return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() ->
{
final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");
@@ -221,15 +221,13 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
throw new IllegalStateException("Missing ClientCnx. This
should not happen.");
}
- // Need to do our own equality because the physical address is
resolved already
- if
(!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
- && logicalAddress.getPort() ==
resolvedPhysicalAddress.getPort())) {
+ if (!logicalAddress.equals(unresolvedPhysicalAddress)) {
// We are connecting through a proxy. We need to set the
target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
cnx.setTargetBroker(logicalAddress);
}
- cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());
+ cnx.setRemoteHostName(unresolvedPhysicalAddress.getHostString());
return ch;
}));