This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3d2e6ce84b6e69667a1c2095b766d9941a258b61
Author: Lari Hotari <lhot...@apache.org>
AuthorDate: Wed Feb 9 19:39:11 2022 +0200

    [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)
    
    (cherry picked from commit 640b4e6ec14d9b812da608037c58b664e6778637)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml                                            |   1 +
 .../ProxySaslAuthenticationTest.java               |   1 +
 .../pulsar/common/protocol/PulsarHandler.java      |   2 +-
 pulsar-proxy/pom.xml                               |  13 +-
 .../proxy/server/BrokerDiscoveryProvider.java      |  10 ++
 .../pulsar/proxy/server/BrokerProxyValidator.java  | 181 +++++++++++++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    | 154 ++++++++++--------
 .../pulsar/proxy/server/ProxyConfiguration.java    |  39 +++++
 .../pulsar/proxy/server/ProxyConnection.java       | 101 ++++++++++--
 .../apache/pulsar/proxy/server/ProxyService.java   |  17 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   2 +
 .../proxy/server/ServiceChannelInitializer.java    |   8 +
 .../proxy/server/TargetAddressDeniedException.java |  26 +++
 .../proxy/server/AuthedAdminProxyHandlerTest.java  |   1 +
 .../proxy/server/BrokerProxyValidatorTest.java     | 102 ++++++++++++
 .../proxy/server/ProxyAdditionalServletTest.java   |   1 +
 .../ProxyAuthenticatedProducerConsumerTest.java    |   1 +
 .../proxy/server/ProxyAuthenticationTest.java      |   1 +
 .../pulsar/proxy/server/ProxyConnectionTest.java   |  38 +++++
 .../server/ProxyConnectionThrottlingTest.java      |   1 +
 .../server/ProxyEnableHAProxyProtocolTest.java     |   1 +
 .../proxy/server/ProxyForwardAuthDataTest.java     |   1 +
 .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java |   1 +
 .../server/ProxyKeyStoreTlsTestWithoutAuth.java    |   1 +
 .../proxy/server/ProxyLookupThrottlingTest.java    |   1 +
 .../pulsar/proxy/server/ProxyParserTest.java       |   1 +
 .../proxy/server/ProxyRolesEnforcementTest.java    |   1 +
 .../proxy/server/ProxyServiceStarterTest.java      |   5 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |  15 +-
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |   1 +
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |   1 +
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |   1 +
 .../pulsar/proxy/server/ProxyTlsTestWithAuth.java  |   1 +
 .../server/ProxyWithAuthorizationNegTest.java      |   1 +
 .../proxy/server/ProxyWithAuthorizationTest.java   |   2 +
 .../server/ProxyWithJwtAuthorizationTest.java      |   1 +
 .../server/ProxyWithoutServiceDiscoveryTest.java   |   1 +
 .../SuperUserAuthedAdminProxyHandlerTest.java      |   1 +
 .../server/UnauthedAdminProxyHandlerTest.java      |   1 +
 40 files changed, 655 insertions(+), 85 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index cd71b38..4653b71 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -522,6 +522,8 @@ The Apache Software License, Version 2.0
     - com.google.http-client-google-http-client-1.38.0.jar
     - com.google.auto.value-auto-value-annotations-1.7.4.jar
     - com.google.re2j-re2j-1.5.jar
+  * IPAddress
+    - com.github.seancfoley-ipaddress-5.3.3.jar
 
 BSD 3-clause "New" or "Revised" License
  * Google auth library
diff --git a/pom.xml b/pom.xml
index 1075d57..27dcb4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -195,6 +195,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <cron-utils.version>9.1.6</cron-utils.version>
     <spring-context.version>5.3.15</spring-context.version>
     <apache-http-client.version>4.5.13</apache-http-client.version>
+    <seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
 
     <!-- test dependencies -->
     <cassandra.version>3.6.0</cassandra.version>
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index aabdcf8..e7b2c44 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -224,6 +224,7 @@ public class ProxySaslAuthenticationTest extends 
ProducerConsumerBase {
                ProxyConfiguration proxyConfig = new ProxyConfiguration();
                proxyConfig.setAuthenticationEnabled(true);
                proxyConfig.setServicePort(Optional.of(0));
+               proxyConfig.setBrokerProxyAllowedTargetPorts("*");
                proxyConfig.setWebServicePort(Optional.of(0));
                proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
                proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + 
".*");
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index cdf372d..48e7ffa 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -118,7 +118,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
         }
     }
 
-    protected void cancelKeepAliveTask() {
+    public void cancelKeepAliveTask() {
         if (keepAliveTask != null) {
             keepAliveTask.cancel(false);
             keepAliveTask = null;
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 39aecb5..aa2f463 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -19,7 +19,7 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar</groupId>
@@ -174,6 +174,17 @@
       <groupId>com.beust</groupId>
       <artifactId>jcommander</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.github.seancfoley</groupId>
+      <artifactId>ipaddress</artifactId>
+      <version>${seancfoley.ipaddress.version}</version>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index ae8e134..c549b45 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +78,15 @@ public class BrokerDiscoveryProvider implements Closeable {
     }
 
     /**
+     * Access the list of available brokers.
+     * @return the list of available brokers
+     * @throws PulsarServerException
+     */
+    public List<? extends ServiceLookupData> getAvailableBrokers() throws 
PulsarServerException {
+        return metadataStoreCacheLoader.getAvailableBrokers();
+    }
+
+    /**
      * Find next broker {@link LoadManagerReport} in round-robin fashion.
      *
      * @return
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
new file mode 100644
index 0000000..debe1f7
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
+import inet.ipaddr.ipv4.IPv4Address;
+import inet.ipaddr.ipv6.IPv6Address;
+import io.netty.resolver.AddressResolver;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringTokenizer;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
+
+@Slf4j
+public class BrokerProxyValidator {
+    private static final String SEPARATOR = "\\s*,\\s*";
+    private static final String ALLOW_ANY = "*";
+    private final int[] allowedTargetPorts;
+    private final boolean allowAnyTargetPort;
+    private final List<IPAddress> allowedIPAddresses;
+    private final boolean allowAnyIPAddress;
+    private final AddressResolver<InetSocketAddress> inetSocketAddressResolver;
+    private final List<Pattern> allowedHostNames;
+    private final boolean allowAnyHostName;
+
+    public BrokerProxyValidator(AddressResolver<InetSocketAddress> 
inetSocketAddressResolver, String allowedHostNames,
+                                String allowedIPAddresses, String 
allowedTargetPorts) {
+        this.inetSocketAddressResolver = inetSocketAddressResolver;
+        List<String> allowedHostNamesStrings = 
parseCommaSeparatedConfigValue(allowedHostNames);
+        if (allowedHostNamesStrings.contains(ALLOW_ANY)) {
+            this.allowAnyHostName = true;
+            this.allowedHostNames = Collections.emptyList();
+        } else {
+            this.allowAnyHostName = false;
+            this.allowedHostNames = allowedHostNamesStrings.stream()
+                    
.map(BrokerProxyValidator::parseWildcardPattern).collect(Collectors.toList());
+        }
+        List<String> allowedIPAddressesStrings = 
parseCommaSeparatedConfigValue(allowedIPAddresses);
+        if (allowedIPAddressesStrings.contains(ALLOW_ANY)) {
+            allowAnyIPAddress = true;
+            this.allowedIPAddresses = Collections.emptyList();
+        } else {
+            allowAnyIPAddress = false;
+            this.allowedIPAddresses = 
allowedIPAddressesStrings.stream().map(IPAddressString::new)
+                    .filter(ipAddressString -> {
+                        if (ipAddressString.isValid()) {
+                            return true;
+                        } else {
+                            throw new IllegalArgumentException("Invalid IP 
address filter '" + ipAddressString + "'",
+                                    
ipAddressString.getAddressStringException());
+                        }
+                    }).map(IPAddressString::getAddress)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+        }
+        List<String> allowedTargetPortsStrings = 
parseCommaSeparatedConfigValue(allowedTargetPorts);
+        if (allowedTargetPortsStrings.contains(ALLOW_ANY)) {
+            allowAnyTargetPort = true;
+            this.allowedTargetPorts = new int[0];
+        } else {
+            allowAnyTargetPort = false;
+            this.allowedTargetPorts =
+                    
allowedTargetPortsStrings.stream().mapToInt(Integer::parseInt).toArray();
+        }
+    }
+
+    private static Pattern parseWildcardPattern(String wildcardPattern) {
+        String regexPattern =
+                Collections.list(new StringTokenizer(wildcardPattern, "*", 
true))
+                        .stream()
+                        .map(String::valueOf)
+                        .map(token -> {
+                            if ("*".equals(token)) {
+                                return ".*";
+                            } else {
+                                return Pattern.quote(token);
+                            }
+                        }).collect(Collectors.joining());
+        return Pattern.compile(
+                "^" + regexPattern + "$",
+                Pattern.CASE_INSENSITIVE);
+    }
+
+    private static List<String> parseCommaSeparatedConfigValue(String 
configValue) {
+        return 
Arrays.stream(configValue.split(SEPARATOR)).map(String::trim).filter(s -> 
s.length() > 0)
+                .collect(Collectors.toList());
+    }
+
+    public CompletableFuture<InetSocketAddress> 
resolveAndCheckTargetAddress(String hostAndPort) {
+        int pos = hostAndPort.indexOf(':');
+        String host = hostAndPort.substring(0, pos);
+        int port = Integer.parseInt(hostAndPort.substring(pos + 1));
+        if (!isPortAllowed(port)) {
+            return FutureUtil.failedFuture(
+                    new TargetAddressDeniedException("Given port in '" + 
hostAndPort + "' isn't allowed."));
+        } else if (!isHostAllowed(host)) {
+            return FutureUtil.failedFuture(
+                    new TargetAddressDeniedException("Given host in '" + 
hostAndPort + "' isn't allowed."));
+        } else {
+            return NettyFutureUtil.toCompletableFuture(
+                            
inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host, 
port)))
+                    .thenCompose(resolvedAddress -> {
+                        CompletableFuture<InetSocketAddress> result = new 
CompletableFuture();
+                        if (isIPAddressAllowed(resolvedAddress)) {
+                            result.complete(resolvedAddress);
+                        } else {
+                            result.completeExceptionally(new 
TargetAddressDeniedException(
+                                    "The IP address of the given host and port 
'" + hostAndPort + "' isn't allowed."));
+                        }
+                        return result;
+                    });
+        }
+    }
+
+    private boolean isPortAllowed(int port) {
+        if (allowAnyTargetPort) {
+            return true;
+        }
+        for (int allowedPort : allowedTargetPorts) {
+            if (allowedPort == port) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isIPAddressAllowed(InetSocketAddress resolvedAddress) {
+        if (allowAnyIPAddress) {
+            return true;
+        }
+        byte[] addressBytes = resolvedAddress.getAddress().getAddress();
+        IPAddress candidateAddress =
+                addressBytes.length == 4 ? new IPv4Address(addressBytes) : new 
IPv6Address(addressBytes);
+        for (IPAddress allowedAddress : allowedIPAddresses) {
+            if (allowedAddress.contains(candidateAddress)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isHostAllowed(String host) {
+        if (allowAnyHostName) {
+            return true;
+        }
+        boolean matched = false;
+        for (Pattern allowedHostName : allowedHostNames) {
+            if (allowedHostName.matcher(host).matches()) {
+                matched = true;
+                break;
+            }
+        }
+        return matched;
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index c896be5..37fc3d5 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -39,6 +39,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
 import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -49,6 +50,7 @@ import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import javax.net.ssl.SSLSession;
@@ -73,25 +75,25 @@ import org.slf4j.LoggerFactory;
 public class DirectProxyHandler {
 
     @Getter
-    private Channel inboundChannel;
+    private final Channel inboundChannel;
     @Getter
     Channel outboundChannel;
     @Getter
     private final Rate inboundChannelRequestsRate;
     protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new 
ConcurrentHashMap<>();
-    private String originalPrincipal;
-    private AuthData clientAuthData;
-    private String clientAuthMethod;
-    private int protocolVersion;
+    private final String originalPrincipal;
+    private final AuthData clientAuthData;
+    private final String clientAuthMethod;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
-    private final Supplier<SslHandler> sslHandlerSupplier;
     private AuthenticationDataProvider authenticationDataProvider;
-    private ProxyService service;
+    private final ProxyService service;
+    private final Runnable onHandshakeCompleteAction;
 
     public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection, String targetBrokerUrl,
-            int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
+                              InetSocketAddress targetBrokerAddress, int 
protocolVersion,
+                              Supplier<SslHandler> sslHandlerSupplier) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -99,8 +101,7 @@ public class DirectProxyHandler {
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
-        this.protocolVersion = protocolVersion;
-        this.sslHandlerSupplier = sslHandlerSupplier;
+        this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -109,13 +110,22 @@ public class DirectProxyHandler {
         // switches when passing data between the 2
         // connections
         b.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+        int brokerProxyConnectTimeoutMs = 
service.getConfiguration().getBrokerProxyConnectTimeoutMs();
+        if (brokerProxyConnectTimeoutMs > 0) {
+            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
brokerProxyConnectTimeoutMs);
+        }
         
b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ,
 false);
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
-            protected void initChannel(SocketChannel ch) throws Exception {
+            protected void initChannel(SocketChannel ch) {
                 if (sslHandlerSupplier != null) {
                     ch.pipeline().addLast(TLS_HANDLER, 
sslHandlerSupplier.get());
                 }
+                int brokerProxyReadTimeoutMs = 
service.getConfiguration().getBrokerProxyReadTimeoutMs();
+                if (brokerProxyReadTimeoutMs > 0) {
+                    ch.pipeline().addLast("readTimeoutHandler",
+                            new ReadTimeoutHandler(brokerProxyReadTimeoutMs, 
TimeUnit.MILLISECONDS));
+                }
                 ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(
                     Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
                 ch.pipeline().addLast("proxyOutboundHandler", new 
ProxyBackendHandler(config, protocolVersion));
@@ -133,7 +143,7 @@ public class DirectProxyHandler {
             return;
         }
 
-        ChannelFuture f = b.connect(targetBroker.getHost(), 
targetBroker.getPort());
+        ChannelFuture f = b.connect(targetBrokerAddress);
         outboundChannel = f.channel();
         f.addListener(future -> {
             if (!future.isSuccess()) {
@@ -211,8 +221,8 @@ public class DirectProxyHandler {
         private BackendState state = BackendState.Init;
         private String remoteHostName;
         protected ChannelHandlerContext ctx;
-        private ProxyConfiguration config;
-        private int protocolVersion;
+        private final ProxyConfiguration config;
+        private final int protocolVersion;
 
         public ProxyBackendHandler(ProxyConfiguration config, int 
protocolVersion) {
             this.config = config;
@@ -225,7 +235,7 @@ public class DirectProxyHandler {
             // Send the Connect command to broker
             authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
             AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-            ByteBuf command = null;
+            ByteBuf command;
             command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, 
clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command);
@@ -293,12 +303,11 @@ public class DirectProxyHandler {
                 outboundChannel.read();
             } catch (Exception e) {
                 log.error("Error mutual verify", e);
-                return;
             }
         }
 
         @Override
-        public void operationComplete(Future<Void> future) throws Exception {
+        public void operationComplete(Future<Void> future) {
             // This is invoked when the write operation on the paired 
connection
             // is completed
             if (future.isSuccess()) {
@@ -317,6 +326,7 @@ public class DirectProxyHandler {
 
         @Override
         protected void handleConnected(CommandConnected connected) {
+            checkArgument(state == BackendState.Init, "Unexpected state %s. 
BackendState.Init was expected.", state);
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Received Connected from broker", 
inboundChannel, outboundChannel);
             }
@@ -332,58 +342,68 @@ public class DirectProxyHandler {
 
             state = BackendState.HandshakeCompleted;
 
-            ChannelFuture channelFuture;
-            if (connected.hasMaxMessageSize()) {
-                channelFuture = inboundChannel.writeAndFlush(
-                    Commands.newConnected(connected.getProtocolVersion(), 
connected.getMaxMessageSize()));
-            } else {
-                channelFuture = 
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
-            }
+            onHandshakeCompleteAction.run();
+            startDirectProxying(connected);
+
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? 
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(),
 maxMessageSize))
+                    .addListener(future -> {
+                        if (future.isSuccess()) {
+                            // Start reading from both connections
+                            inboundChannel.read();
+                            outboundChannel.read();
+                        } else {
+                            log.warn("[{}] [{}] Failed to write to inbound 
connection. Closing both connections.",
+                                    inboundChannel,
+                                    outboundChannel, future.cause());
+                            inboundChannel.close();
+                        }
+                    });
+        }
 
-            channelFuture.addListener(future -> {
-                if (service.getProxyLogLevel() == 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] [{}] Removing decoder from pipeline", 
inboundChannel, outboundChannel);
-                    }
-                    // direct tcp proxy
-                    inboundChannel.pipeline().remove("frameDecoder");
-                    outboundChannel.pipeline().remove("frameDecoder");
+        private void startDirectProxying(CommandConnected connected) {
+            if (service.getProxyLogLevel() == 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Removing decoder from pipeline", 
inboundChannel, outboundChannel);
+                }
+                // direct tcp proxy
+                inboundChannel.pipeline().remove("frameDecoder");
+                outboundChannel.pipeline().remove("frameDecoder");
+            } else {
+                // Enable parsing feature, proxyLogLevel(1 or 2)
+                // Add parser handler
+                if (connected.hasMaxMessageSize()) {
+                    inboundChannel.pipeline()
+                            .replace("frameDecoder", "newFrameDecoder",
+                                    new 
LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+                                            + 
Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                            0, 4, 0, 4));
+                    outboundChannel.pipeline().replace("frameDecoder", 
"newFrameDecoder",
+                            new LengthFieldBasedFrameDecoder(
+                                    connected.getMaxMessageSize()
+                                            + 
Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                    0, 4, 0, 4));
+
+                    inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
+                            new ParserProxyHandler(service, inboundChannel,
+                                    ParserProxyHandler.FRONTEND_CONN,
+                                    connected.getMaxMessageSize()));
+                    
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                            new ParserProxyHandler(service, outboundChannel,
+                                    ParserProxyHandler.BACKEND_CONN,
+                                    connected.getMaxMessageSize()));
                 } else {
-                    // Enable parsing feature, proxyLogLevel(1 or 2)
-                    // Add parser handler
-                    if (connected.hasMaxMessageSize()) {
-                        inboundChannel.pipeline().replace("frameDecoder", 
"newFrameDecoder",
-                                                          new 
LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
-                                                                               
            + Commands.MESSAGE_SIZE_FRAME_PADDING,
-                                                                               
            0, 4, 0, 4));
-                        outboundChannel.pipeline().replace("frameDecoder", 
"newFrameDecoder",
-                                                           new 
LengthFieldBasedFrameDecoder(
-                                                               
connected.getMaxMessageSize()
-                                                               + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-
-                        inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
-                                                            new 
ParserProxyHandler(service, inboundChannel,
-                                                                               
    ParserProxyHandler.FRONTEND_CONN,
-                                                                               
    connected.getMaxMessageSize()));
-                        
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                                                             new 
ParserProxyHandler(service, outboundChannel,
-                                                                               
     ParserProxyHandler.BACKEND_CONN,
-                                                                               
     connected.getMaxMessageSize()));
-                    } else {
-                        inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
-                                                            new 
ParserProxyHandler(service, inboundChannel,
-                                                                               
    ParserProxyHandler.FRONTEND_CONN,
-                                                                               
    Commands.DEFAULT_MAX_MESSAGE_SIZE));
-                        
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                                                             new 
ParserProxyHandler(service, outboundChannel,
-                                                                               
     ParserProxyHandler.BACKEND_CONN,
-                                                                               
     Commands.DEFAULT_MAX_MESSAGE_SIZE));
-                    }
+                    inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
+                            new ParserProxyHandler(service, inboundChannel,
+                                    ParserProxyHandler.FRONTEND_CONN,
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                    
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                            new ParserProxyHandler(service, outboundChannel,
+                                    ParserProxyHandler.BACKEND_CONN,
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
                 }
-                // Start reading from both connections
-                inboundChannel.read();
-                outboundChannel.read();
-            });
+            }
         }
 
         @Override
@@ -404,7 +424,7 @@ public class DirectProxyHandler {
         private boolean verifyTlsHostName(String hostname, 
ChannelHandlerContext ctx) {
             ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
 
-            SSLSession sslSession = null;
+            SSLSession sslSession;
             if (sslHandler != null) {
                 sslSession = ((SslHandler) sslHandler).engine().getSession();
                 return (new TlsHostnameVerifier()).verify(hostname, 
sslSession);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index a50ac70..fd56e18 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -49,6 +49,8 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     @Category
     private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery";
     @Category
+    private static final String CATEGORY_BROKER_PROXY = "Broker Proxy";
+    @Category
     private static final String CATEGORY_AUTHENTICATION = "Proxy 
Authentication";
     @Category
     private static final String CATEGORY_AUTHORIZATION = "Proxy Authorization";
@@ -136,6 +138,43 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private String functionWorkerWebServiceURLTLS;
 
+    @FieldContext(category = CATEGORY_BROKER_PROXY,
+            doc = "When enabled, checks that the target broker is active 
before connecting. "
+                    + "zookeeperServers and configurationStoreServers must be 
configured in proxy configuration "
+                    + "for retrieving the active brokers.")
+    private boolean checkActiveBrokers = false;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Broker proxy connect timeout.\n"
+                    + "The timeout value for Broker proxy connect timeout is 
in millisecond. Set to 0 to disable."
+    )
+    private int brokerProxyConnectTimeoutMs = 10000;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Broker proxy read timeout.\n"
+                    + "The timeout value for Broker proxy read timeout is in 
millisecond. Set to 0 to disable."
+    )
+    private int brokerProxyReadTimeoutMs = 75000;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target host names. "
+                    + "Supports multiple comma separated entries and a 
wildcard.")
+    private String brokerProxyAllowedHostNames = "*";
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target ip addresses or ip networks / 
netmasks. "
+                    + "Supports multiple comma separated entries.")
+    private String brokerProxyAllowedIPAddresses = "*";
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target ports")
+    private String brokerProxyAllowedTargetPorts = "6650,6651";
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Hostname or IP address the service binds on"
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index ff392ca..df99aca 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -28,7 +31,7 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
-import io.netty.handler.codec.haproxy.HAProxyMessage;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -50,6 +53,7 @@ import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,11 +70,12 @@ import lombok.Getter;
  *
  */
 public class ProxyConnection extends PulsarHandler implements 
FutureListener<Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProxyConnection.class);
     // ConnectionPool is used by the proxy to issue lookup requests
     private ConnectionPool connectionPool;
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, 
Long.MAX_VALUE / 2));
-    private ProxyService service;
+    private final ProxyService service;
     AuthenticationDataSource authenticationData;
     private State state;
     private final Supplier<SslHandler> sslHandlerSupplier;
@@ -78,6 +83,7 @@ public class ProxyConnection extends PulsarHandler implements 
FutureListener<Voi
     private LookupProxyHandler lookupProxyHandler = null;
     @Getter
     private DirectProxyHandler directProxyHandler = null;
+    private final BrokerProxyValidator brokerProxyValidator;
     String clientAuthRole;
     AuthData clientAuthData;
     String clientAuthMethod;
@@ -109,6 +115,8 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         // looking into it
         ProxyConnectionToBroker,
 
+        Closing,
+
         Closed,
     }
 
@@ -121,6 +129,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         this.service = proxyService;
         this.state = State.Init;
         this.sslHandlerSupplier = sslHandlerSupplier;
+        this.brokerProxyValidator = service.getBrokerProxyValidator();
     }
 
     @Override
@@ -128,6 +137,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         super.channelRegistered(ctx);
         ProxyService.activeConnections.inc();
         if (ProxyService.activeConnections.get() > 
service.getConfiguration().getMaxConcurrentInboundConnections()) {
+            state = State.Closing;
             ctx.close();
             ProxyService.rejectedConnections.inc();
         }
@@ -173,6 +183,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+        state = State.Closing;
         super.exceptionCaught(ctx, cause);
         LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, 
cause.getClass().getSimpleName(), cause.getMessage(),
                 ClientCnx.isKnownException(cause) ? null : cause);
@@ -211,7 +222,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
     }
 
     @Override
-    public void operationComplete(Future<Void> future) throws Exception {
+    public void operationComplete(Future<Void> future) {
         // This is invoked when the write operation on the paired connection is
         // completed
         if (future.isSuccess()) {
@@ -247,14 +258,51 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         }
 
         LOG.info("[{}] complete connection, init proxy handler. authenticated 
with {} role {}, hasProxyToBrokerUrl: {}",
-            remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
+                remoteAddress, authMethod, clientAuthRole, 
hasProxyToBrokerUrl);
         if (hasProxyToBrokerUrl) {
-            // Client already knows which broker to connect. Let's open a
-            // connection there and just pass bytes in both directions
-            state = State.ProxyConnectionToBroker;
-            directProxyHandler = new DirectProxyHandler(service, this, 
proxyToBrokerUrl,
-                protocolVersionToAdvertise, sslHandlerSupplier);
-            cancelKeepAliveTask();
+            // Optimize proxy connection to fail-fast if the target broker 
isn't active
+            // Pulsar client will retry connecting after a back off timeout
+            if (service.getConfiguration().isCheckActiveBrokers()
+                    && !isBrokerActive(proxyToBrokerUrl)) {
+                state = State.Closing;
+                LOG.warn("[{}] Target broker '{}' isn't available. 
authenticated with {} role {}.",
+                        remoteAddress, proxyToBrokerUrl, authMethod, 
clientAuthRole);
+                ctx()
+                        .writeAndFlush(
+                                Commands.newError(-1, 
ServerError.ServiceNotReady, "Target broker isn't available."))
+                        .addListener(future -> ctx().close());
+                return;
+            }
+
+            brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
+                    .thenAccept(address -> ctx().executor().submit(() -> {
+                        // Client already knows which broker to connect. Let's 
open a
+                        // connection there and just pass bytes in both 
directions
+                        state = State.ProxyConnectionToBroker;
+                        directProxyHandler = new DirectProxyHandler(service, 
this, proxyToBrokerUrl, address,
+                                protocolVersionToAdvertise, 
sslHandlerSupplier);
+                    }))
+                    .exceptionally(throwable -> {
+                        if (throwable instanceof TargetAddressDeniedException
+                                || throwable.getCause() instanceof 
TargetAddressDeniedException) {
+                            TargetAddressDeniedException 
targetAddressDeniedException =
+                                    (TargetAddressDeniedException) (throwable 
instanceof TargetAddressDeniedException
+                                            ? throwable : 
throwable.getCause());
+
+                            LOG.warn("[{}] Target broker '{}' cannot be 
validated. {}. authenticated with {} role {}.",
+                                    remoteAddress, proxyToBrokerUrl, 
targetAddressDeniedException.getMessage(),
+                                    authMethod, clientAuthRole);
+                        } else {
+                            LOG.error("[{}] Error validating target broker 
'{}'. authenticated with {} role {}.",
+                                    remoteAddress, proxyToBrokerUrl, 
authMethod, clientAuthRole, throwable);
+                        }
+                        ctx()
+                                .writeAndFlush(
+                                        Commands.newError(-1, 
ServerError.ServiceNotReady,
+                                                "Target broker cannot be 
validated."))
+                                .addListener(future -> ctx().close());
+                        return null;
+                    });
         } else {
             // Client is doing a lookup, we can consider the handshake complete
             // and we'll take care of just topics and
@@ -306,6 +354,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
 
         if (getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v10.getValue()) {
             LOG.warn("[{}] Client doesn't support connecting through proxy", 
remoteAddress);
+            state = State.Closing;
             ctx.close();
             return;
         }
@@ -485,6 +534,36 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         return haProxyMessage;
     }
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ProxyConnection.class);
+    private boolean isBrokerActive(String targetBrokerHostPort) {
+        for (ServiceLookupData serviceLookupData : getAvailableBrokers()) {
+            if (matchesHostAndPort("pulsar://", 
serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort)
+                    || matchesHostAndPort("pulsar+ssl://", 
serviceLookupData.getPulsarServiceUrlTls(),
+                    targetBrokerHostPort)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private List<? extends ServiceLookupData> getAvailableBrokers() {
+        if (service.getDiscoveryProvider() == null) {
+            LOG.warn("Unable to retrieve active brokers. 
service.getDiscoveryProvider() is null."
+                    + "zookeeperServers and configurationStoreServers must be 
configured in proxy configuration "
+                    + "when checkActiveBrokers is enabled.");
+            return Collections.emptyList();
+        }
+        try {
+            return service.getDiscoveryProvider().getAvailableBrokers();
+        } catch (PulsarServerException e) {
+            LOG.error("Unable to get available brokers", e);
+            return Collections.emptyList();
+        }
+    }
 
+    static boolean matchesHostAndPort(String expectedPrefix, String 
pulsarServiceUrl, String brokerHostPort) {
+        return pulsarServiceUrl != null
+                && pulsarServiceUrl.length() == expectedPrefix.length() + 
brokerHostPort.length()
+                && pulsarServiceUrl.startsWith(expectedPrefix)
+                && pulsarServiceUrl.startsWith(brokerHostPort, 
expectedPrefix.length());
+    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 75f922d..302ceba 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -26,6 +26,8 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
@@ -73,6 +75,10 @@ public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
     private final Authentication proxyClientAuthentication;
+    @Getter
+    private final DnsNameResolver dnsNameResolver;
+    @Getter
+    private final BrokerProxyValidator brokerProxyValidator;
     private String serviceUrl;
     private String serviceUrlTls;
     private ConfigurationMetadataCacheService configurationCacheService;
@@ -147,6 +153,15 @@ public class ProxyService implements Closeable {
                 false, workersThreadFactory);
         this.authenticationService = authenticationService;
 
+        DnsNameResolverBuilder dnsNameResolverBuilder = new 
DnsNameResolverBuilder(workerGroup.next())
+                
.channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+        dnsNameResolver = dnsNameResolverBuilder.build();
+
+        brokerProxyValidator = new 
BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
+                proxyConfig.getBrokerProxyAllowedHostNames(),
+                proxyConfig.getBrokerProxyAllowedIPAddresses(),
+                proxyConfig.getBrokerProxyAllowedTargetPorts());
+
         statsExecutor = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("proxy-stats-executor"));
         statsExecutor.schedule(()->{
@@ -234,6 +249,8 @@ public class ProxyService implements Closeable {
     }
 
     public void close() throws IOException {
+        dnsNameResolver.close();
+
         if (discoveryProvider != null) {
             discoveryProvider.close();
         }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 86ce236..6bbcc88 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -27,6 +27,7 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.install;
 import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
 
 import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -88,6 +89,7 @@ public class ProxyServiceStarter {
 
     private ProxyConfiguration config;
 
+    @Getter
     private ProxyService proxyService;
 
     private WebServer server;
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 658dd87..a033a87 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.proxy.server;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -46,6 +48,7 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
     private final ProxyService proxyService;
     private final boolean enableTls;
     private final boolean tlsEnabledWithKeyStore;
+    private final int brokerProxyReadTimeoutMs;
 
     private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
     private SslContextAutoRefreshBuilder<SslContext> clientSslCtxRefresher;
@@ -58,6 +61,7 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
         this.proxyService = proxyService;
         this.enableTls = enableTls;
         this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
+        this.brokerProxyReadTimeoutMs = 
serviceConfig.getBrokerProxyReadTimeoutMs();
 
         if (enableTls) {
             if (tlsEnabledWithKeyStore) {
@@ -127,6 +131,10 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
             ch.pipeline().addLast(TLS_HANDLER,
                     new 
SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
         }
+        if (brokerProxyReadTimeoutMs > 0) {
+            ch.pipeline().addLast("readTimeoutHandler",
+                    new ReadTimeoutHandler(brokerProxyReadTimeoutMs, 
TimeUnit.MILLISECONDS));
+        }
         if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
             ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new 
OptionalProxyProtocolDecoder());
         }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
new file mode 100644
index 0000000..e62525f
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+class TargetAddressDeniedException extends RuntimeException {
+    public TargetAddressDeniedException(String message) {
+        super(message);
+    }
+}
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 545912d..88de4b3 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -83,6 +83,7 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setAuthenticationEnabled(true);
         proxyConfig.setAuthorizationEnabled(true);
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
new file mode 100644
index 0000000..8e45755
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import io.netty.resolver.AddressResolver;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.SucceededFuture;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.curator.shaded.com.google.common.net.InetAddresses;
+import org.testng.annotations.Test;
+
+public class BrokerProxyValidatorTest {
+
+    @Test
+    public void shouldAllowValidInput() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "myhost"
+                , "1.2.0.0/16"
+                , "6650");
+        InetSocketAddress inetSocketAddress = 
brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+        assertNotNull(inetSocketAddress);
+        assertEquals(inetSocketAddress.getAddress().getHostAddress(), 
"1.2.3.4");
+        assertEquals(inetSocketAddress.getPort(), 6650);
+    }
+
+    @Test(expectedExceptions = ExecutionException.class,
+            expectedExceptionsMessageRegExp = ".*Given host in 'myhost:6650' 
isn't allowed.")
+    public void shouldPreventInvalidHostName() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "allowedhost"
+                , "1.2.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+    }
+
+    @Test(expectedExceptions = ExecutionException.class,
+            expectedExceptionsMessageRegExp = ".* The IP address of the given 
host and port 'myhost:6650' isn't allowed.")
+    public void shouldPreventInvalidIPAddress() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "myhost"
+                , "1.3.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+    }
+
+    @Test
+    public void shouldSupportHostNamePattern() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "*.mydomain"
+                , "1.2.0.0/16"
+                , "6650");
+        
brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    @Test
+    public void shouldAllowAllWithWildcard() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "*"
+                , "*"
+                , "6650");
+        
brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    private AddressResolver<InetSocketAddress> 
createMockedAddressResolver(String ipAddressResult) {
+        AddressResolver<InetSocketAddress> inetSocketAddressResolver = 
mock(AddressResolver.class);
+        when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock 
-> {
+            InetSocketAddress address = (InetSocketAddress) 
invocationOnMock.getArgument(0);
+            return new 
SucceededFuture<SocketAddress>(mock(EventExecutor.class),
+                    new 
InetSocketAddress(InetAddresses.forString(ipAddressResult), address.getPort()));
+        });
+        return inetSocketAddressResolver;
+    }
+}
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index a909a9f..94009c8 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -72,6 +72,7 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index e63d3ae..b37dedf 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -106,6 +106,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 57aa781..25eee72 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -216,6 +216,7 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                ProxyConfiguration proxyConfig = new ProxyConfiguration();
                proxyConfig.setAuthenticationEnabled(true);
                proxyConfig.setServicePort(Optional.of(0));
+               proxyConfig.setBrokerProxyAllowedTargetPorts("*");
                proxyConfig.setWebServicePort(Optional.of(0));
                proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
new file mode 100644
index 0000000..5f533e3
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+public class ProxyConnectionTest {
+
+    @Test
+    public void testMatchesHostAndPort() {
+        assertTrue(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:6650", 
"1.2.3.4:6650"));
+        assertTrue(ProxyConnection
+                .matchesHostAndPort("pulsar+ssl://", 
"pulsar+ssl://1.2.3.4:6650", "1.2.3.4:6650"));
+        assertFalse(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", 
"5.6.7.8:1234"));
+        assertFalse(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", 
"1.2.3.4:1234"));
+    }
+}
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 062db18..128d33f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -53,6 +53,7 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 44403fb..496b3ca 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -56,6 +56,7 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
         internalSetup();
 
         proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         proxyConfig.setHaProxyProtocolEnabled(true);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index cf61dac..aa84755 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -104,6 +104,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
index af76bfa..f1cb69f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -78,6 +78,7 @@ public class ProxyKeyStoreTlsTestWithAuth extends 
MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
index 9b0e9b4..03d0b2b 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -73,6 +73,7 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends 
MockedPulsarServiceBaseTest
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index fa3c485..5145026 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -52,6 +52,7 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 905ca20..654686d 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -71,6 +71,7 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         //enable full parsing feature
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 9ae3fbc..39446af 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -209,6 +209,7 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index bdba8d3..62b65d3 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -51,6 +51,7 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
     static final String[] ARGS = new String[]{"-c", 
"./src/test/resources/proxy.conf"};
 
     private ProxyServiceStarter serviceStarter;
+    private String serviceUrl;
 
     @Override
     @BeforeClass
@@ -62,7 +63,9 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
         serviceStarter.getConfig().setWebServicePort(Optional.of(0));
         serviceStarter.getConfig().setServicePort(Optional.of(0));
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+        serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
         serviceStarter.start();
+        serviceUrl = serviceStarter.getProxyService().getServiceUrl();
     }
 
     @Override
@@ -92,7 +95,7 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testProducer() throws Exception {
         @Cleanup
-        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
this.pulsar.getBrokerService().getListenPort().get())
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
                 .build();
 
         @Cleanup
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 7e6c0f5..742cfbb 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -52,6 +52,8 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
     private final String TLS_PROXY_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/server-cert.pem";
     private final String TLS_PROXY_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/server-key.pem";
     private ProxyServiceStarter serviceStarter;
+    private String serviceUrl;
+    private int webPort;
 
     @Override
     @BeforeClass
@@ -62,12 +64,17 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
         
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
         
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
-        serviceStarter.getConfig().setServicePortTls(Optional.of(11043));
+        serviceStarter.getConfig().setServicePort(Optional.empty());
+        serviceStarter.getConfig().setServicePortTls(Optional.of(0));
+        serviceStarter.getConfig().setWebServicePort(Optional.of(0));
         serviceStarter.getConfig().setTlsEnabledWithBroker(true);
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
         
serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
         serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
         serviceStarter.start();
+        serviceUrl = serviceStarter.getProxyService().getServiceUrlTls();
+        webPort = serviceStarter.getServer().getListenPortHTTP().get();
     }
 
     protected void doInitConf() throws Exception {
@@ -86,7 +93,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testProducer() throws Exception {
         @Cleanup
-        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043")
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
                 
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
                 .build();
 
@@ -106,7 +113,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
         WebSocketClient producerWebSocketClient = new 
WebSocketClient(producerClient);
         producerWebSocketClient.start();
         MyWebSocket producerSocket = new MyWebSocket();
-        String produceUri = 
"ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+        String produceUri = "ws://localhost:" + webPort + 
"/ws/producer/persistent/sample/test/local/websocket-topic";
         Future<Session> producerSession = 
producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
 
         ProducerMessage produceRequest = new ProducerMessage();
@@ -117,7 +124,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
         WebSocketClient consumerWebSocketClient = new 
WebSocketClient(consumerClient);
         consumerWebSocketClient.start();
         MyWebSocket consumerSocket = new MyWebSocket();
-        String consumeUri = 
"ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+        String consumeUri = "ws://localhost:" + webPort + 
"/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
         Future<Session> consumerSession = 
consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
         
consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
         
producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 2b1c22c..1859c24 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -67,6 +67,7 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 92f6a63..a90243f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -90,6 +90,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 59beb94..5081d0e 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -56,6 +56,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest 
{
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
index 0d3d3a0..ece35cf 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
@@ -58,6 +58,7 @@ public class ProxyTlsTestWithAuth extends 
MockedPulsarServiceBaseTest {
         writer.close();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 5d05867..b9d9b04 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -113,6 +113,7 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 14c7288..d813777 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -184,6 +184,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
@@ -402,6 +403,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 693e4ca..6178454 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -98,6 +98,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
 
         // enable auth&auth and use JWT at proxy
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index f20401c..59c50de 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -104,6 +104,7 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index 7dc927a..342df28 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -80,6 +80,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBas
         proxyConfig.setAuthenticationEnabled(true);
         proxyConfig.setAuthorizationEnabled(true);
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 628f7cc..ee18b60 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -68,6 +68,7 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
 
         // start proxy service
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
         proxyConfig.setStatusFilePath(STATUS_FILE_PATH);

Reply via email to