This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b805a4ad45427a143d54013ad92035be06d8c641
Author: Yuri Mizushima <[email protected]>
AuthorDate: Wed Aug 14 11:09:50 2024 +0900

    [improve][proxy] Reuse authentication instance in pulsar-proxy (#23113)
    
    (cherry picked from commit 3e461c004ea229ef9b526a51fd0ed91e8157e873)
---
 .../ProxySaslAuthenticationTest.java               |  6 +-
 .../pulsar/proxy/server/AdminProxyHandler.java     | 23 ++----
 .../pulsar/proxy/server/DirectProxyHandler.java    |  4 +-
 .../apache/pulsar/proxy/server/ProxyService.java   | 12 +--
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 46 +++++++++--
 .../extensions/SimpleProxyExtensionTestBase.java   | 12 ++-
 .../server/AdminProxyHandlerKeystoreTLSTest.java   | 25 ++++--
 .../pulsar/proxy/server/AdminProxyHandlerTest.java |  3 +-
 .../proxy/server/AuthedAdminProxyHandlerTest.java  | 15 +++-
 .../proxy/server/FunctionWorkerRoutingTest.java    | 10 ++-
 .../InvalidProxyConfigForAuthorizationTest.java    |  3 +-
 .../proxy/server/ProxyAdditionalServletTest.java   | 55 +++++++------
 .../ProxyAuthenticatedProducerConsumerTest.java    | 11 ++-
 .../proxy/server/ProxyAuthenticationTest.java      |  7 +-
 .../server/ProxyConnectionThrottlingTest.java      | 11 ++-
 .../proxy/server/ProxyDisableZeroCopyTest.java     | 25 +-----
 .../server/ProxyEnableHAProxyProtocolTest.java     | 12 ++-
 .../proxy/server/ProxyForwardAuthDataTest.java     | 10 ++-
 .../pulsar/proxy/server/ProxyIsAHttpProxyTest.java | 60 +++++++++++---
 .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java | 12 ++-
 .../server/ProxyKeyStoreTlsTestWithoutAuth.java    | 12 ++-
 .../server/ProxyKeyStoreTlsTransportTest.java      | 12 ++-
 .../proxy/server/ProxyLookupThrottlingTest.java    | 14 +++-
 .../pulsar/proxy/server/ProxyMutualTlsTest.java    | 12 ++-
 .../pulsar/proxy/server/ProxyParserTest.java       | 11 ++-
 .../proxy/server/ProxyPrometheusMetricsTest.java   | 15 +++-
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  | 12 ++-
 .../proxy/server/ProxyRolesEnforcementTest.java    |  9 ++-
 .../proxy/server/ProxyServiceStarterTest.java      | 91 ++++++++++++++++++++++
 .../apache/pulsar/proxy/server/ProxyStatsTest.java | 14 +++-
 .../proxy/server/ProxyStuckConnectionTest.java     | 12 ++-
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 28 +++++--
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   | 12 ++-
 .../pulsar/proxy/server/ProxyTlsTestWithAuth.java  | 12 ++-
 .../server/ProxyWithAuthorizationNegTest.java      | 10 ++-
 .../proxy/server/ProxyWithAuthorizationTest.java   | 21 ++++-
 .../server/ProxyWithJwtAuthorizationTest.java      | 28 +++++--
 .../server/ProxyWithoutServiceDiscoveryTest.java   | 11 ++-
 .../SuperUserAuthedAdminProxyHandlerTest.java      | 15 +++-
 .../server/UnauthedAdminProxyHandlerTest.java      | 16 +++-
 40 files changed, 570 insertions(+), 159 deletions(-)

diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index f0e45aa734a..c4ccc31142c 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -258,7 +258,11 @@ public class ProxySaslAuthenticationTest extends 
ProducerConsumerBase {
                proxyConfig.setForwardAuthorizationCredentials(true);
                AuthenticationService authenticationService = new 
AuthenticationService(
                         PulsarConfigurationLoader.convertFrom(proxyConfig));
-               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService);
+               @Cleanup
+               final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                               
proxyConfig.getBrokerClientAuthenticationParameters());
+               proxyClientAuthentication.start();
+               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
 
                proxyService.start();
                final String proxyServiceUrl = "pulsar://localhost:" + 
proxyService.getListenPort().get();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index caaa99c5d40..0108b770249 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import javax.net.ssl.SSLContext;
@@ -40,7 +39,6 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.KeyStoreParams;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.SecurityUtility;
@@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet {
 
     private final ProxyConfiguration config;
     private final BrokerDiscoveryProvider discoveryProvider;
+    private final Authentication proxyClientAuthentication;
     private final String brokerWebServiceUrl;
     private final String functionWorkerWebServiceUrl;
 
-    AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider) {
+    AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider,
+                      Authentication proxyClientAuthentication) {
         this.config = config;
         this.discoveryProvider = discoveryProvider;
+        this.proxyClientAuthentication = proxyClientAuthentication;
         this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? 
config.getBrokerWebServiceURLTLS()
                 : config.getBrokerWebServiceURL();
         this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? 
config.getFunctionWorkerWebServiceURLTLS()
@@ -256,22 +257,13 @@ class AdminProxyHandler extends ProxyServlet {
     @Override
     protected HttpClient newHttpClient() {
         try {
-            Authentication auth = AuthenticationFactory.create(
-                config.getBrokerClientAuthenticationPlugin(),
-                config.getBrokerClientAuthenticationParameters()
-            );
-
-            Objects.requireNonNull(auth, "No supported auth found for proxy");
-
-            auth.start();
-
             if (config.isTlsEnabledWithBroker()) {
                 try {
                     X509Certificate[] trustCertificates = SecurityUtility
                         
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());
 
                     SSLContext sslCtx;
-                    AuthenticationDataProvider authData = auth.getAuthData();
+                    AuthenticationDataProvider authData = 
proxyClientAuthentication.getAuthData();
                     if (config.isBrokerClientTlsEnabledWithKeyStore()) {
                         KeyStoreParams params = authData.hasDataForTls() ? 
authData.getTlsKeyStoreParams() : null;
                         sslCtx = KeyStoreSSLContext.createClientSslContext(
@@ -311,11 +303,6 @@ class AdminProxyHandler extends ProxyServlet {
                     return new JettyHttpClient(contextFactory);
                 } catch (Exception e) {
                     LOG.error("new jetty http client exception ", e);
-                    try {
-                        auth.close();
-                    } catch (IOException ioe) {
-                        LOG.error("Failed to close the authentication 
service", ioe);
-                    }
                     throw new 
PulsarClientException.InvalidConfigurationException(e.getMessage());
                 }
             }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index d63b04b6734..4678db82c6e 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -52,7 +52,6 @@ import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
@@ -114,8 +113,7 @@ public class DirectProxyHandler {
 
             if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
                 try {
-                    authData = 
AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
-                            
config.getBrokerClientAuthenticationParameters()).getAuthData();
+                    authData = authentication.getAuthData();
                 } catch (PulsarClientException e) {
                     throw new RuntimeException(e);
                 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 64885413320..6ee9f5bcdfd 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -63,8 +63,6 @@ import 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -152,7 +150,8 @@ public class ProxyService implements Closeable {
     private final ConnectionController connectionController;
 
     public ProxyService(ProxyConfiguration proxyConfig,
-                        AuthenticationService authenticationService) throws 
Exception {
+                        AuthenticationService authenticationService,
+                        Authentication proxyClientAuthentication) throws 
Exception {
         requireNonNull(proxyConfig);
         this.proxyConfig = proxyConfig;
         this.clientCnxs = Sets.newConcurrentHashSet();
@@ -201,12 +200,7 @@ public class ProxyService implements Closeable {
             });
         }, 60, TimeUnit.SECONDS);
         this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
-        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
-            proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                    proxyConfig.getBrokerClientAuthenticationParameters());
-        } else {
-            proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
-        }
+        this.proxyClientAuthentication = proxyClientAuthentication;
         this.connectionController = new 
ConnectionController.DefaultConnectionController(
                 proxyConfig.getMaxConcurrentInboundConnections(),
                 proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index cba7b4b6ad5..19811c90a4f 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -31,11 +31,13 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
 import io.prometheus.client.hotspot.DefaultExports;
+import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.Objects;
 import java.util.function.Consumer;
 import lombok.Getter;
 import org.apache.logging.log4j.LogManager;
@@ -45,6 +47,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import 
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -101,6 +107,9 @@ public class ProxyServiceStarter {
 
     private ProxyConfiguration config;
 
+    @Getter
+    private Authentication proxyClientAuthentication;
+
     @Getter
     private ProxyService proxyService;
 
@@ -241,8 +250,27 @@ public class ProxyServiceStarter {
     public void start() throws Exception {
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(config));
+
+        if (config.getBrokerClientAuthenticationPlugin() != null) {
+            proxyClientAuthentication = 
AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
+                    config.getBrokerClientAuthenticationParameters());
+            Objects.requireNonNull(proxyClientAuthentication, "No supported 
auth found for proxy");
+            try {
+                proxyClientAuthentication.start();
+            } catch (Exception e) {
+                try {
+                    proxyClientAuthentication.close();
+                } catch (IOException ioe) {
+                    log.error("Failed to close the authentication service", 
ioe);
+                }
+                throw new 
PulsarClientException.InvalidConfigurationException(e.getMessage());
+            }
+        } else {
+            proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
+        }
+
         // create proxy service
-        proxyService = new ProxyService(config, authenticationService);
+        proxyService = new ProxyService(config, authenticationService, 
proxyClientAuthentication);
         // create a web-service
         server = new WebServer(config, authenticationService);
 
@@ -289,7 +317,8 @@ public class ProxyServiceStarter {
             metricsInitialized = true;
         }
 
-        addWebServerHandlers(server, config, proxyService, 
proxyService.getDiscoveryProvider());
+        addWebServerHandlers(server, config, proxyService, 
proxyService.getDiscoveryProvider(),
+                proxyClientAuthentication);
 
         // start web-service
         server.start();
@@ -303,6 +332,9 @@ public class ProxyServiceStarter {
             if (server != null) {
                 server.stop();
             }
+            if (proxyClientAuthentication != null) {
+                proxyClientAuthentication.close();
+            }
         } catch (Exception e) {
             log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
         } finally {
@@ -313,9 +345,10 @@ public class ProxyServiceStarter {
     }
 
     public static void addWebServerHandlers(WebServer server,
-                                     ProxyConfiguration config,
-                                     ProxyService service,
-                                     BrokerDiscoveryProvider 
discoveryProvider) throws Exception {
+                                            ProxyConfiguration config,
+                                            ProxyService service,
+                                            BrokerDiscoveryProvider 
discoveryProvider,
+                                            Authentication 
proxyClientAuthentication) throws Exception {
         // We can make 'status.html' publicly accessible without 
authentication since
         // it does not contain any sensitive data.
         server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, 
config.getStatusFilePath(),
@@ -332,7 +365,8 @@ public class ProxyServiceStarter {
             }
         }
 
-        AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, 
discoveryProvider);
+        AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, 
discoveryProvider,
+                proxyClientAuthentication);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         server.addServlet("/admin", servletHolder);
         server.addServlet("/lookup", servletHolder);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index 79662097c3b..4f5436dccd6 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -26,6 +26,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -121,6 +123,7 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
     private ProxyService proxyService;
     private boolean useSeparateThreadPoolForProxyExtensions;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     public SimpleProxyExtensionTestBase(boolean 
useSeparateThreadPoolForProxyExtensions) {
         this.useSeparateThreadPoolForProxyExtensions = 
useSeparateThreadPoolForProxyExtensions;
@@ -141,8 +144,12 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -172,6 +179,9 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
 
         if (tempDirectory != null) {
             FileUtils.deleteDirectory(tempDirectory);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index d6796b7eaa6..b4be7bebb83 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -18,11 +18,18 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -34,18 +41,13 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
 public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTest {
 
 
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
+    private Authentication proxyClientAuthentication;
+
     private WebServer webServer;
 
     private BrokerDiscoveryProvider discoveryProvider;
@@ -103,12 +105,16 @@ public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTes
 
         resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
                 new ZKMetadataStore(mockZooKeeperGlobal));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig)));
         discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, 
resource));
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), 
brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
-        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider));
+        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
         webServer.start();
@@ -118,6 +124,9 @@ public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTes
     @Override
     protected void cleanup() throws Exception {
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         super.internalCleanup();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
index becebe0059e..4f925618e8a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
@@ -32,6 +32,7 @@ import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.client.api.Authentication;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
 import org.testng.Assert;
@@ -46,7 +47,7 @@ public class AdminProxyHandlerTest {
         // given
         HttpClient httpClient = mock(HttpClient.class);
         adminProxyHandler = new 
AdminProxyHandler(mock(ProxyConfiguration.class),
-                mock(BrokerDiscoveryProvider.class)) {
+                mock(BrokerDiscoveryProvider.class), 
mock(Authentication.class)) {
             @Override
             protected HttpClient createHttpClient() throws ServletException {
                 return httpClient;
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index af70276aed9..840bfe17fe4 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -20,18 +20,17 @@ package org.apache.pulsar.proxy.server;
 
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-
 import java.util.Optional;
-
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -51,6 +50,7 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(AuthedAdminProxyHandlerTest.class);
 
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private WebServer webServer;
     private BrokerDiscoveryProvider discoveryProvider;
     private PulsarResources resource;
@@ -100,13 +100,17 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
 
         resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
                 new ZKMetadataStore(mockZooKeeperGlobal));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           
PulsarConfigurationLoader.convertFrom(proxyConfig)));
         discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, 
resource));
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), 
brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
 
-        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider));
+        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 
@@ -118,6 +122,9 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     @Override
     protected void cleanup() throws Exception {
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         super.internalCleanup();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
index db5e9e12bd2..a07a0f082d3 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,8 +40,13 @@ public class FunctionWorkerRoutingTest {
         proxyConfig.setBrokerWebServiceURL(brokerUrl);
         proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl);
 
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         BrokerDiscoveryProvider discoveryProvider = 
mock(BrokerDiscoveryProvider.class);
-        AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, 
discoveryProvider);
+        AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, 
discoveryProvider, proxyClientAuthentication);
 
         String funcUrl = 
handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test"));
         Assert.assertEquals(funcUrl, 
String.format("%s/admin/v3/functions/%s/%s",
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
index c29bfaa9648..b7ef0855e38 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -33,7 +34,7 @@ public class InvalidProxyConfigForAuthorizationTest {
         proxyConfiguration.setAuthorizationEnabled(true);
         proxyConfiguration.setAuthenticationEnabled(false);
         try (ProxyService proxyService = new ProxyService(proxyConfiguration,
-                Mockito.mock(AuthenticationService.class))) {
+                Mockito.mock(AuthenticationService.class), 
Mockito.mock(Authentication.class))) {
             proxyService.start();
             fail("An exception should have been thrown");
         } catch (Exception e) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index 17cd3c33e79..67cf5aca911 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -18,18 +18,36 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import javax.servlet.Servlet;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.OkHttpClient;
 import okhttp3.Response;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
 import 
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
-import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.mockito.Mockito;
@@ -38,24 +56,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import javax.servlet.Servlet;
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletOutputStream;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.mockito.Mockito.doReturn;
-import static org.testng.Assert.assertEquals;
-
 @Slf4j
 public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest {
 
@@ -65,6 +65,7 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private WebServer proxyWebServer;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -82,8 +83,13 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         // this is for nar package test
 //        addServletNar();
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
-                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+                proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -97,7 +103,7 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         mockAdditionalServlet();
 
         proxyWebServer = new WebServer(proxyConfig, authService);
-        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         proxyWebServer.start();
     }
 
@@ -177,6 +183,9 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index bfe86f86976..ab97bf05201 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -74,6 +75,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private final String configClusterName = "test";
 
     @BeforeMethod
@@ -138,8 +140,12 @@ public class ProxyAuthenticatedProducerConsumerTest 
extends ProducerConsumerBase
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
         proxyService.start();
@@ -150,6 +156,9 @@ public class ProxyAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 9c8e5197adf..3207c2c3d6a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -43,6 +43,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -233,7 +234,11 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                 AuthenticationService authenticationService = new 
AuthenticationService(
                         PulsarConfigurationLoader.convertFrom(proxyConfig));
                @Cleanup
-               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService);
+               final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                               
proxyConfig.getBrokerClientAuthenticationParameters());
+               proxyClientAuthentication.start();
+               @Cleanup
+               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
 
                proxyService.start();
                final String proxyServiceUrl = proxyService.getServiceUrl();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 336f11ae19d..5d950d847ef 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -27,6 +27,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.limiter.ConnectionController;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -46,6 +48,7 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
     private final int NUM_CONCURRENT_INBOUND_CONNECTION = 4;
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -59,8 +62,11 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
         
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
         
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -72,6 +78,9 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
     protected void cleanup() throws Exception {
         internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
index 3aa71413d54..6a3992c550f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
@@ -18,32 +18,11 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.mockito.Mockito.doReturn;
-import java.util.Optional;
-import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.mockito.Mockito;
-import org.testng.annotations.BeforeClass;
-
 public class ProxyDisableZeroCopyTest extends ProxyTest {
 
     @Override
-    @BeforeClass
-    protected void setup() throws Exception {
-        internalSetup();
-
-        proxyConfig.setServicePort(Optional.ofNullable(0));
-        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
-        proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
-        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+    protected void initializeProxyConfig() throws Exception {
+        super.initializeProxyConfig();
         proxyConfig.setProxyZeroCopyModeEnabled(false);
-
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
-        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
-
-        proxyService.start();
     }
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 8b3092c6f51..77212c9535b 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -22,6 +22,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -48,6 +50,7 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -61,8 +64,12 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setHaProxyProtocolEnabled(true);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -75,6 +82,9 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index b7cfb874747..8e3a2fdf264 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -30,6 +30,8 @@ import lombok.Cleanup;
 
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -116,7 +118,11 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
 
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
-        try (ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService)) {
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        try (ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication)) {
             proxyService.start();
             try (PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) {
                 
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
@@ -132,7 +138,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
 
         @Cleanup
-        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService);
+        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
         proxyService.start();
 
         @Cleanup
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
index 246dd9f85e3..be51bd18323 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
@@ -36,10 +36,12 @@ import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.Response;
-
+import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.eclipse.jetty.client.HttpClient;
@@ -201,10 +203,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -230,10 +236,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         try {
             Response r1 = 
client.target(webServer.getServiceUri()).path("/server1/foobar").request().get();
@@ -261,10 +271,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
 
     }
 
@@ -280,10 +294,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -307,10 +325,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -333,10 +355,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get();
@@ -358,6 +384,10 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         StringBuilder longUri = new StringBuilder("/service3/tp");
         for (int i = 10 * 1024; i > 0; i = i - 11){
@@ -366,7 +396,7 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
 
         WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, 
authService);
         ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, 
proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServerMaxUriLen8k.start();
         try {
             Response r = 
client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get();
@@ -378,7 +408,7 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024);
         WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, 
authService);
         ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, 
proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServerMaxUriLen12k.start();
         try {
             Response r = 
client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get();
@@ -399,10 +429,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -431,10 +465,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
 
         HttpClient httpClient = new HttpClient();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
index 88e7b269d6e..d22c64f977f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -33,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -54,6 +56,7 @@ import org.testng.annotations.Test;
 public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod
@@ -87,9 +90,13 @@ public class ProxyKeyStoreTlsTestWithAuth extends 
MockedPulsarServiceBaseTest {
         providers.add(AuthenticationProviderTls.class.getName());
         proxyConfig.setAuthenticationProviders(providers);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -102,6 +109,9 @@ public class ProxyKeyStoreTlsTestWithAuth extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     protected PulsarClient internalSetUpForClient(boolean addCertificates, 
String lookupUrl) throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
index 5feef74e3b9..050b230ba4d 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -29,6 +29,8 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -50,6 +52,7 @@ import org.testng.annotations.Test;
 public class ProxyKeyStoreTlsTestWithoutAuth extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod
@@ -75,8 +78,12 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends 
MockedPulsarServiceBaseTest
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -107,6 +114,9 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends 
MockedPulsarServiceBaseTest
     protected void cleanup() throws Exception {
         internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5c4e40ed65a..2a9bc720b79 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -24,6 +24,8 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -40,6 +42,7 @@ import org.testng.annotations.Test;
 public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest 
{
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod
@@ -86,9 +89,13 @@ public class ProxyKeyStoreTlsTransportTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
         proxyConfig.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -101,6 +108,9 @@ public class ProxyKeyStoreTlsTransportTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     protected PulsarClient newClient() throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 1b63aa14dfe..9348a4df4db 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -21,16 +21,15 @@ package org.apache.pulsar.proxy.server;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import java.util.Optional;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
-
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -53,6 +52,7 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
     private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5;
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod(alwaysRun = true)
@@ -68,7 +68,10 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
 
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -82,6 +85,9 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
         if (proxyService != null) {
             proxyService.close();
         }
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test(groups = "quarantine")
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index ad237c25397..a73b2d388f8 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -48,6 +50,7 @@ public class ProxyMutualTlsTest extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -67,8 +70,12 @@ public class ProxyMutualTlsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
         proxyConfig.setTlsAllowInsecureConnection(false);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -81,6 +88,9 @@ public class ProxyMutualTlsTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 82cd702aa7f..74a4435b768 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -64,6 +66,7 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -76,9 +79,12 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         //enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -93,6 +99,9 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
index 6948996ad46..69dcda2a4d7 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -42,6 +42,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.awaitility.Awaitility;
@@ -59,6 +61,7 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private WebServer proxyWebServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -72,8 +75,13 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(TEST_CLUSTER);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
-                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+                proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -85,7 +93,7 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
 
         proxyWebServer = new WebServer(proxyConfig, authService);
-        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         proxyWebServer.start();
     }
 
@@ -108,6 +116,9 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
         if (proxyService != null) {
             proxyService.close();
         }
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index 2f36cc679f1..b058e4af830 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -35,6 +35,8 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -56,6 +58,7 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
 
     private ProxyService proxyService;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     protected void doInitConf() throws Exception {
@@ -125,9 +128,13 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
         properties.setProperty("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
         proxyConfig.setProperties(properties);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                 new AuthenticationService(
-                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                        PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
     }
 
     @AfterClass(alwaysRun = true)
@@ -135,6 +142,9 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     private void startProxy(boolean forwardAuthData) throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 3259cfd95c7..e32f79b0f0f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import javax.naming.AuthenticationException;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -35,6 +36,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -217,9 +219,14 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         providers.add(BasicAuthenticationProvider.class.getName());
         proxyConfig.setAuthenticationProviders(providers);
 
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         try (ProxyService proxyService = new ProxyService(proxyConfig,
                 new AuthenticationService(
-                        PulsarConfigurationLoader.convertFrom(proxyConfig)))) {
+                        PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication)) {
             proxyService.start();
 
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 759eabdb5a2..e12415b0d91 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -20,16 +20,22 @@ package org.apache.pulsar.proxy.server;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Base64;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Future;
+import java.util.function.Consumer;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.eclipse.jetty.client.HttpClient;
@@ -155,4 +161,89 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testProxyClientAuthentication() throws Exception {
+        final Consumer<ProxyConfiguration> initConfig = (proxyConfig) -> {
+            proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+            proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+            proxyConfig.setWebServicePort(Optional.of(0));
+            proxyConfig.setServicePort(Optional.of(0));
+            proxyConfig.setWebSocketServiceEnabled(true);
+            proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+            proxyConfig.setClusterName(configClusterName);
+        };
+
+
+
+        ProxyServiceStarter serviceStarter = new ProxyServiceStarter(ARGS, 
null, true);
+        initConfig.accept(serviceStarter.getConfig());
+        // ProxyServiceStarter will throw an exception when 
Authentication#start is failed
+        
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication1.class.getName());
+        try {
+            serviceStarter.start();
+            fail("ProxyServiceStarter should throw an exception when 
Authentication#start is failed");
+        } catch (Exception ex) {
+            
assertTrue(ex.getMessage().contains("ExceptionAuthentication1#start"));
+            assertTrue(serviceStarter.getProxyClientAuthentication() 
instanceof ExceptionAuthentication1);
+        }
+
+        serviceStarter = new ProxyServiceStarter(ARGS, null, true);
+        initConfig.accept(serviceStarter.getConfig());
+        // ProxyServiceStarter will throw an exception when 
Authentication#start and Authentication#close are failed
+        
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication2.class.getName());
+        try {
+            serviceStarter.start();
+            fail("ProxyServiceStarter should throw an exception when 
Authentication#start and Authentication#close are failed");
+        } catch (Exception ex) {
+            
assertTrue(ex.getMessage().contains("ExceptionAuthentication2#start"));
+            assertTrue(serviceStarter.getProxyClientAuthentication() 
instanceof ExceptionAuthentication2);
+        }
+    }
+
+    public static class ExceptionAuthentication1 implements Authentication {
+
+        @Override
+        public String getAuthMethodName() {
+            return 
"org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication1";
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // no-op
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            throw new PulsarClientException("ExceptionAuthentication1#start");
+        }
+
+        @Override
+        public void close() throws IOException {
+            // no-op
+        }
+    }
+
+    public static class ExceptionAuthentication2 implements Authentication {
+
+        @Override
+        public String getAuthMethodName() {
+            return 
"org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication2";
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // no-op
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            throw new PulsarClientException("ExceptionAuthentication2#start");
+        }
+
+        @Override
+        public void close() throws IOException {
+            throw new IOException("ExceptionAuthentication2#close");
+        }
+    }
+
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 140af88aae7..5769571d187 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -38,6 +38,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -61,6 +63,7 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private WebServer proxyWebServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -75,8 +78,12 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
         // enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.of(2));
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
-                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -88,7 +95,7 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
 
         proxyWebServer = new WebServer(proxyConfig, authService);
-        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         proxyWebServer.start();
     }
 
@@ -106,6 +113,9 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
     protected void cleanup() throws Exception {
         internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 97279659af6..5052f598dba 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -28,6 +28,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
@@ -56,6 +58,7 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig;
+    private Authentication proxyClientAuthentication;
     private SocatContainer socatContainer;
 
     private String brokerServiceUriSocat;
@@ -80,6 +83,10 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         startProxyService();
         // use the same port for subsequent restarts
         proxyConfig.setServicePort(proxyService.getListenPort());
@@ -87,7 +94,7 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
 
     private void startProxyService() throws Exception {
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))) {
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication) {
             @Override
             protected LookupProxyHandler newLookupProxyHandler(ProxyConnection 
proxyConnection) {
                 return new TestLookupProxyHandler(this, proxyConnection);
@@ -105,6 +112,9 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
         if (proxyService != null) {
             proxyService.close();
         }
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         if (socatContainer != null) {
             socatContainer.close();
         }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index e799e2e948a..6fab7d6af21 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -38,6 +38,8 @@ import org.apache.avro.reflect.Nullable;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -73,6 +75,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
 
     protected ProxyService proxyService;
     protected ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    protected Authentication proxyClientAuthentication;
 
     @Data
     @ToString
@@ -90,25 +93,38 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
     protected void setup() throws Exception {
         internalSetup();
 
-        proxyConfig.setServicePort(Optional.ofNullable(0));
-        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
-        proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
-        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        initializeProxyConfig();
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService)
+                .createConfigurationMetadataStore();
 
         proxyService.start();
     }
 
+    protected void initializeProxyConfig() throws Exception {
+        proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
+        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
+
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+    }
+
     @Override
     @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 64b0cd6b1a6..79a55dea674 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -45,6 +47,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest 
{
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -62,8 +65,12 @@ public class ProxyTlsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -76,6 +83,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest 
{
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
index 0f1fa74a209..2a68d92c9e1 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
@@ -27,6 +27,8 @@ import java.util.Optional;
 import org.apache.pulsar.broker.auth.MockOIDCIdentityProvider;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.mockito.Mockito;
@@ -38,6 +40,7 @@ public class ProxyTlsTestWithAuth extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     private MockOIDCIdentityProvider server;
 
@@ -74,8 +77,12 @@ public class ProxyTlsTestWithAuth extends 
MockedPulsarServiceBaseTest {
             " \"audience\": \"an-audience\"," +
             " \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}");
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-            PulsarConfigurationLoader.convertFrom(proxyConfig))));
+            PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
         doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
 
@@ -87,6 +94,9 @@ public class ProxyTlsTestWithAuth extends 
MockedPulsarServiceBaseTest {
     protected void cleanup() throws Exception {
         internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         server.stop();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 2d97a4b06a8..b10fb221600 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -34,6 +34,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -72,6 +73,7 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @BeforeMethod
     @Override
@@ -136,7 +138,10 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
 
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication));
 
         proxyService.start();
     }
@@ -146,6 +151,9 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 31757cc0367..9087e8f477e 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -38,6 +38,7 @@ import 
org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -86,6 +87,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
     private ProxyService proxyService;
     private WebServer webServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @DataProvider(name = "hostnameVerification")
     public Object[][] hostnameVerificationCodecProvider() {
@@ -228,7 +230,10 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
 
         AuthenticationService authService =
                 new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, 
proxyClientAuthentication));
         webServer = new WebServer(proxyConfig, authService);
     }
 
@@ -238,11 +243,14 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         super.internalCleanup();
         proxyService.close();
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     private void startProxy() throws Exception {
         proxyService.start();
-        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         webServer.start();
     }
 
@@ -415,7 +423,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
      * This test verifies whether the Client and Proxy honor the protocols and 
ciphers specified. Details description of
      * test cases can be found in protocolsCiphersProviderCodecProvider
      */
-    @Test(dataProvider = "protocolsCiphersProvider", timeOut = 5000)
+    @Test(dataProvider = "protocolsCiphersProvider", timeOut = 10000)
     public void tlsCiphersAndProtocols(Set<String> tlsCiphers, Set<String> 
tlsProtocols, boolean expectFailure)
             throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -455,9 +463,14 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setTlsProtocols(tlsProtocols);
         proxyConfig.setTlsCiphers(tlsCiphers);
 
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
+        @Cleanup
         ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                         new 
AuthenticationService(
-                                                                
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                                
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         try {
             proxyService.start();
         } catch (Exception ex) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 88ecfe8a318..6b1890e2326 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -39,7 +39,16 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -74,6 +83,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
     private ProxyService proxyService;
     private WebServer webServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @BeforeMethod
     @Override
@@ -120,7 +130,10 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
 
         AuthenticationService authService =
                 new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, 
proxyClientAuthentication));
         webServer = new WebServer(proxyConfig, authService);
     }
 
@@ -130,11 +143,14 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         super.internalCleanup();
         proxyService.close();
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     private void startProxy() throws Exception {
         proxyService.start();
-        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         webServer.start();
     }
 
@@ -415,7 +431,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
         final WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         @Cleanup
         final Client client = javax.ws.rs.client.ClientBuilder
@@ -440,7 +456,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setAuthenticateMetricsEndpoint(false);
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         @Cleanup
         Client client = javax.ws.rs.client.ClientBuilder.newClient(new 
ClientConfig().register(LoggingFeature.class));
@@ -453,7 +469,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setAuthenticateMetricsEndpoint(true);
         webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService,
-                new BrokerDiscoveryProvider(proxyConfig, resource));
+                new BrokerDiscoveryProvider(proxyConfig, resource), 
proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/metrics").request().get();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index ec1412b021d..23fdd47c280 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -33,6 +33,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -56,6 +57,7 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class);
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @BeforeMethod
     @Override
@@ -122,9 +124,13 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
 
         proxyConfig.setAuthenticationProviders(providers);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
 
         proxyService.start();
     }
@@ -134,6 +140,9 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index d3291c8fb91..0d510281134 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -20,18 +20,17 @@ package org.apache.pulsar.proxy.server;
 
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-
 import java.util.Optional;
-
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -47,6 +46,7 @@ import org.testng.annotations.Test;
 
 public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private WebServer webServer;
     private BrokerDiscoveryProvider discoveryProvider;
     private PulsarResources resource;
@@ -95,13 +95,17 @@ public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBas
 
         resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
                 new ZKMetadataStore(mockZooKeeperGlobal));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           
PulsarConfigurationLoader.convertFrom(proxyConfig)));
         discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, 
resource));
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), 
brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
 
-        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider));
+        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 
@@ -113,6 +117,9 @@ public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBas
     @Override
     protected void cleanup() throws Exception {
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         super.internalCleanup();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index aa4aeaa2ea8..78ac1e46a15 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -35,6 +35,8 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -49,6 +51,7 @@ import org.testng.annotations.Test;
 public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest 
{
     private final String STATUS_FILE_PATH = 
"./src/test/resources/vip_status.html";
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private WebServer webServer;
     private BrokerDiscoveryProvider discoveryProvider;
     private AdminProxyWrapper adminProxyHandler;
@@ -76,13 +79,17 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           
PulsarConfigurationLoader.convertFrom(proxyConfig)));
 
         resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper),
                 new ZKMetadataStore(mockZooKeeperGlobal));
         discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, 
resource));
-        adminProxyHandler = new AdminProxyWrapper(proxyConfig, 
discoveryProvider);
+        adminProxyHandler = new AdminProxyWrapper(proxyConfig, 
discoveryProvider, proxyClientAuthentication);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
@@ -100,6 +107,9 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     protected void cleanup() throws Exception {
         internalCleanup();
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
@@ -126,8 +136,8 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     static class AdminProxyWrapper extends AdminProxyHandler {
         String rewrittenUrl;
 
-        AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider) {
-            super(config, discoveryProvider);
+        AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider, Authentication proxyClientAuthentication) {
+            super(config, discoveryProvider, proxyClientAuthentication);
         }
 
         @Override

Reply via email to