This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d5833f98f7f [improve][proxy] Reuse authentication instance in 
pulsar-proxy (#23113)
d5833f98f7f is described below

commit d5833f98f7f5456ea58b42c1803783d03da1ca1d
Author: Yuri Mizushima <[email protected]>
AuthorDate: Wed Aug 14 11:09:50 2024 +0900

    [improve][proxy] Reuse authentication instance in pulsar-proxy (#23113)
---
 .../ProxySaslAuthenticationTest.java               |  6 +-
 .../pulsar/proxy/server/AdminProxyHandler.java     | 23 ++----
 .../pulsar/proxy/server/DirectProxyHandler.java    |  4 +-
 .../apache/pulsar/proxy/server/ProxyService.java   | 12 +--
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 47 +++++++++--
 .../extensions/SimpleProxyExtensionTestBase.java   | 12 ++-
 .../server/AdminProxyHandlerKeystoreTLSTest.java   | 13 +++-
 .../pulsar/proxy/server/AdminProxyHandlerTest.java |  3 +-
 .../proxy/server/AuthedAdminProxyHandlerTest.java  | 12 ++-
 .../proxy/server/FunctionWorkerRoutingTest.java    | 10 ++-
 .../InvalidProxyConfigForAuthorizationTest.java    |  3 +-
 .../proxy/server/ProxyAdditionalServletTest.java   | 15 +++-
 .../ProxyAuthenticatedProducerConsumerTest.java    | 11 ++-
 .../proxy/server/ProxyAuthenticationTest.java      |  7 +-
 .../server/ProxyConnectionThrottlingTest.java      | 11 ++-
 .../proxy/server/ProxyDisableZeroCopyTest.java     |  2 +-
 .../server/ProxyEnableHAProxyProtocolTest.java     | 12 ++-
 .../proxy/server/ProxyForwardAuthDataTest.java     | 10 ++-
 .../pulsar/proxy/server/ProxyIsAHttpProxyTest.java | 59 +++++++++++---
 .../server/ProxyKeyStoreTlsTransportTest.java      | 12 ++-
 .../proxy/server/ProxyKeyStoreTlsWithAuthTest.java | 12 ++-
 .../server/ProxyKeyStoreTlsWithoutAuthTest.java    | 12 ++-
 .../proxy/server/ProxyLookupThrottlingTest.java    | 11 ++-
 .../pulsar/proxy/server/ProxyMutualTlsTest.java    | 12 ++-
 .../pulsar/proxy/server/ProxyParserTest.java       | 11 ++-
 .../proxy/server/ProxyPrometheusMetricsTest.java   | 15 +++-
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  | 12 ++-
 .../proxy/server/ProxyRolesEnforcementTest.java    |  9 ++-
 .../proxy/server/ProxyServiceStarterTest.java      | 91 ++++++++++++++++++++++
 .../apache/pulsar/proxy/server/ProxyStatsTest.java | 14 +++-
 .../proxy/server/ProxyStuckConnectionTest.java     | 12 ++-
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 14 +++-
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   | 12 ++-
 .../pulsar/proxy/server/ProxyTlsWithAuthTest.java  | 12 ++-
 .../server/ProxyWithAuthorizationNegTest.java      | 10 ++-
 .../proxy/server/ProxyWithAuthorizationTest.java   | 19 ++++-
 .../server/ProxyWithExtensibleLoadManagerTest.java | 11 ++-
 .../server/ProxyWithJwtAuthorizationTest.java      | 18 +++--
 .../server/ProxyWithoutServiceDiscoveryTest.java   | 11 ++-
 .../SuperUserAuthedAdminProxyHandlerTest.java      | 12 ++-
 .../server/UnauthedAdminProxyHandlerTest.java      | 16 +++-
 41 files changed, 536 insertions(+), 94 deletions(-)

diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index a27384c9890..ca28befabc1 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -260,7 +260,11 @@ public class ProxySaslAuthenticationTest extends 
ProducerConsumerBase {
                proxyConfig.setForwardAuthorizationCredentials(true);
                AuthenticationService authenticationService = new 
AuthenticationService(
                         PulsarConfigurationLoader.convertFrom(proxyConfig));
-               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService);
+               @Cleanup
+               final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                               
proxyConfig.getBrokerClientAuthenticationParameters());
+               proxyClientAuthentication.start();
+               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
 
                proxyService.start();
                final String proxyServiceUrl = "pulsar://localhost:" + 
proxyService.getListenPort().get();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index caaa99c5d40..0108b770249 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import javax.net.ssl.SSLContext;
@@ -40,7 +39,6 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.KeyStoreParams;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.SecurityUtility;
@@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet {
 
     private final ProxyConfiguration config;
     private final BrokerDiscoveryProvider discoveryProvider;
+    private final Authentication proxyClientAuthentication;
     private final String brokerWebServiceUrl;
     private final String functionWorkerWebServiceUrl;
 
-    AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider) {
+    AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider,
+                      Authentication proxyClientAuthentication) {
         this.config = config;
         this.discoveryProvider = discoveryProvider;
+        this.proxyClientAuthentication = proxyClientAuthentication;
         this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? 
config.getBrokerWebServiceURLTLS()
                 : config.getBrokerWebServiceURL();
         this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? 
config.getFunctionWorkerWebServiceURLTLS()
@@ -256,22 +257,13 @@ class AdminProxyHandler extends ProxyServlet {
     @Override
     protected HttpClient newHttpClient() {
         try {
-            Authentication auth = AuthenticationFactory.create(
-                config.getBrokerClientAuthenticationPlugin(),
-                config.getBrokerClientAuthenticationParameters()
-            );
-
-            Objects.requireNonNull(auth, "No supported auth found for proxy");
-
-            auth.start();
-
             if (config.isTlsEnabledWithBroker()) {
                 try {
                     X509Certificate[] trustCertificates = SecurityUtility
                         
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());
 
                     SSLContext sslCtx;
-                    AuthenticationDataProvider authData = auth.getAuthData();
+                    AuthenticationDataProvider authData = 
proxyClientAuthentication.getAuthData();
                     if (config.isBrokerClientTlsEnabledWithKeyStore()) {
                         KeyStoreParams params = authData.hasDataForTls() ? 
authData.getTlsKeyStoreParams() : null;
                         sslCtx = KeyStoreSSLContext.createClientSslContext(
@@ -311,11 +303,6 @@ class AdminProxyHandler extends ProxyServlet {
                     return new JettyHttpClient(contextFactory);
                 } catch (Exception e) {
                     LOG.error("new jetty http client exception ", e);
-                    try {
-                        auth.close();
-                    } catch (IOException ioe) {
-                        LOG.error("Failed to close the authentication 
service", ioe);
-                    }
                     throw new 
PulsarClientException.InvalidConfigurationException(e.getMessage());
                 }
             }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index d63b04b6734..4678db82c6e 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -52,7 +52,6 @@ import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
@@ -114,8 +113,7 @@ public class DirectProxyHandler {
 
             if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
                 try {
-                    authData = 
AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
-                            
config.getBrokerClientAuthenticationParameters()).getAuthData();
+                    authData = authentication.getAuthData();
                 } catch (PulsarClientException e) {
                     throw new RuntimeException(e);
                 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index ea9e4ebfaa9..5cf01d6668b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -64,8 +64,6 @@ import 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -158,7 +156,8 @@ public class ProxyService implements Closeable {
     private boolean gracefulShutdown = true;
 
     public ProxyService(ProxyConfiguration proxyConfig,
-                        AuthenticationService authenticationService) throws 
Exception {
+                        AuthenticationService authenticationService,
+                        Authentication proxyClientAuthentication) throws 
Exception {
         requireNonNull(proxyConfig);
         this.proxyConfig = proxyConfig;
         this.clientCnxs = Sets.newConcurrentHashSet();
@@ -207,12 +206,7 @@ public class ProxyService implements Closeable {
             });
         }, 60, TimeUnit.SECONDS);
         this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
-        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
-            proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                    proxyConfig.getBrokerClientAuthenticationParameters());
-        } else {
-            proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
-        }
+        this.proxyClientAuthentication = proxyClientAuthentication;
         this.connectionController = new 
ConnectionController.DefaultConnectionController(
                 proxyConfig.getMaxConcurrentInboundConnections(),
                 proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 10121e7f5d6..a5504cac100 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -29,11 +29,13 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
 import io.prometheus.client.hotspot.DefaultExports;
+import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import lombok.Getter;
@@ -44,6 +46,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import 
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -104,6 +110,9 @@ public class ProxyServiceStarter {
 
     private ProxyConfiguration config;
 
+    @Getter
+    private Authentication proxyClientAuthentication;
+
     @Getter
     private ProxyService proxyService;
 
@@ -244,8 +253,27 @@ public class ProxyServiceStarter {
     public void start() throws Exception {
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(config));
+
+        if (config.getBrokerClientAuthenticationPlugin() != null) {
+            proxyClientAuthentication = 
AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
+                    config.getBrokerClientAuthenticationParameters());
+            Objects.requireNonNull(proxyClientAuthentication, "No supported 
auth found for proxy");
+            try {
+                proxyClientAuthentication.start();
+            } catch (Exception e) {
+                try {
+                    proxyClientAuthentication.close();
+                } catch (IOException ioe) {
+                    log.error("Failed to close the authentication service", 
ioe);
+                }
+                throw new 
PulsarClientException.InvalidConfigurationException(e.getMessage());
+            }
+        } else {
+            proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
+        }
+
         // create proxy service
-        proxyService = new ProxyService(config, authenticationService);
+        proxyService = new ProxyService(config, authenticationService, 
proxyClientAuthentication);
         // create a web-service
         server = new WebServer(config, authenticationService);
 
@@ -293,7 +321,8 @@ public class ProxyServiceStarter {
         }
 
         AtomicReference<WebSocketService> webSocketServiceRef = new 
AtomicReference<>();
-        addWebServerHandlers(server, config, proxyService, 
proxyService.getDiscoveryProvider(), webSocketServiceRef);
+        addWebServerHandlers(server, config, proxyService, 
proxyService.getDiscoveryProvider(), webSocketServiceRef,
+                proxyClientAuthentication);
         webSocketService = webSocketServiceRef.get();
 
         // start web-service
@@ -311,6 +340,9 @@ public class ProxyServiceStarter {
             if (webSocketService != null) {
                 webSocketService.close();
             }
+            if (proxyClientAuthentication != null) {
+                proxyClientAuthentication.close();
+            }
         } catch (Exception e) {
             log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
         } finally {
@@ -323,15 +355,17 @@ public class ProxyServiceStarter {
     public static void addWebServerHandlers(WebServer server,
                                             ProxyConfiguration config,
                                             ProxyService service,
-                                            BrokerDiscoveryProvider 
discoveryProvider) throws Exception {
-        addWebServerHandlers(server, config, service, discoveryProvider, null);
+                                            BrokerDiscoveryProvider 
discoveryProvider,
+                                            Authentication 
proxyClientAuthentication) throws Exception {
+        addWebServerHandlers(server, config, service, discoveryProvider, null, 
proxyClientAuthentication);
     }
 
     public static void addWebServerHandlers(WebServer server,
                                             ProxyConfiguration config,
                                             ProxyService service,
                                             BrokerDiscoveryProvider 
discoveryProvider,
-                                            AtomicReference<WebSocketService> 
webSocketServiceRef) throws Exception {
+                                            AtomicReference<WebSocketService> 
webSocketServiceRef,
+                                            Authentication 
proxyClientAuthentication) throws Exception {
         // We can make 'status.html' publicly accessible without 
authentication since
         // it does not contain any sensitive data.
         server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, 
config.getStatusFilePath(),
@@ -348,7 +382,8 @@ public class ProxyServiceStarter {
             }
         }
 
-        AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, 
discoveryProvider);
+        AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, 
discoveryProvider,
+                proxyClientAuthentication);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         server.addServlet("/admin", servletHolder);
         server.addServlet("/lookup", servletHolder);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index f9ace716ecd..050199acc49 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -26,6 +26,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -121,6 +123,7 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
     private ProxyService proxyService;
     private boolean useSeparateThreadPoolForProxyExtensions;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     public SimpleProxyExtensionTestBase(boolean 
useSeparateThreadPoolForProxyExtensions) {
         this.useSeparateThreadPoolForProxyExtensions = 
useSeparateThreadPoolForProxyExtensions;
@@ -142,8 +145,12 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -174,6 +181,9 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
 
         if (tempDirectory != null) {
             FileUtils.deleteDirectory(tempDirectory);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index 92c644b470d..5995d11b33b 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -24,6 +24,8 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -47,6 +49,8 @@ public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTes
 
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
+    private Authentication proxyClientAuthentication;
+
     private WebServer webServer;
 
     private BrokerDiscoveryProvider discoveryProvider;
@@ -103,6 +107,10 @@ public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTes
                 KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         resource = new PulsarResources(registerCloseable(new 
ZKMetadataStore(mockZooKeeper)),
                 registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         webServer = new WebServer(proxyConfig, new AuthenticationService(
@@ -110,7 +118,7 @@ public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTes
         discoveryProvider = spy(registerCloseable(new 
BrokerDiscoveryProvider(proxyConfig, resource)));
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), 
brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
-        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider));
+        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
         webServer.start();
@@ -120,6 +128,9 @@ public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTes
     @Override
     protected void cleanup() throws Exception {
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         super.internalCleanup();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
index becebe0059e..4f925618e8a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
@@ -32,6 +32,7 @@ import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.client.api.Authentication;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
 import org.testng.Assert;
@@ -46,7 +47,7 @@ public class AdminProxyHandlerTest {
         // given
         HttpClient httpClient = mock(HttpClient.class);
         adminProxyHandler = new 
AdminProxyHandler(mock(ProxyConfiguration.class),
-                mock(BrokerDiscoveryProvider.class)) {
+                mock(BrokerDiscoveryProvider.class), 
mock(Authentication.class)) {
             @Override
             protected HttpClient createHttpClient() throws ServletException {
                 return httpClient;
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index ef58648e35a..97bb91d924c 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -32,6 +32,8 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -51,6 +53,7 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(AuthedAdminProxyHandlerTest.class);
 
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private WebServer webServer;
     private BrokerDiscoveryProvider discoveryProvider;
     private PulsarResources resource;
@@ -99,6 +102,10 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
         
proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         resource = new PulsarResources(registerCloseable(new 
ZKMetadataStore(mockZooKeeper)),
                 registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         webServer = new WebServer(proxyConfig, new AuthenticationService(
@@ -107,7 +114,7 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), 
brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
 
-        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider));
+        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 
@@ -119,6 +126,9 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     @Override
     protected void cleanup() throws Exception {
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         super.internalCleanup();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
index db5e9e12bd2..a07a0f082d3 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,8 +40,13 @@ public class FunctionWorkerRoutingTest {
         proxyConfig.setBrokerWebServiceURL(brokerUrl);
         proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl);
 
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         BrokerDiscoveryProvider discoveryProvider = 
mock(BrokerDiscoveryProvider.class);
-        AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, 
discoveryProvider);
+        AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, 
discoveryProvider, proxyClientAuthentication);
 
         String funcUrl = 
handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test"));
         Assert.assertEquals(funcUrl, 
String.format("%s/admin/v3/functions/%s/%s",
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
index c29bfaa9648..b7ef0855e38 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -33,7 +34,7 @@ public class InvalidProxyConfigForAuthorizationTest {
         proxyConfiguration.setAuthorizationEnabled(true);
         proxyConfiguration.setAuthenticationEnabled(false);
         try (ProxyService proxyService = new ProxyService(proxyConfiguration,
-                Mockito.mock(AuthenticationService.class))) {
+                Mockito.mock(AuthenticationService.class), 
Mockito.mock(Authentication.class))) {
             proxyService.start();
             fail("An exception should have been thrown");
         } catch (Exception e) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index f61a73bbf91..e12224da371 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -25,6 +25,8 @@ import okhttp3.Response;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import 
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
@@ -65,6 +67,7 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private WebServer proxyWebServer;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -83,8 +86,13 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         // this is for nar package test
 //        addServletNar();
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
-                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+                proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -99,7 +107,7 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         mockAdditionalServlet();
 
         proxyWebServer = new WebServer(proxyConfig, authService);
-        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         proxyWebServer.start();
     }
 
@@ -180,6 +188,9 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
 
         proxyService.close();
         proxyWebServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 4083c984d98..2a9a9f15b45 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -74,6 +75,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private final String configClusterName = "test";
 
     @BeforeMethod
@@ -139,8 +141,12 @@ public class ProxyAuthenticatedProducerConsumerTest 
extends ProducerConsumerBase
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -152,6 +158,9 @@ public class ProxyAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 662b8305c0e..7d3cf57d594 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -43,6 +43,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -235,7 +236,11 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                 AuthenticationService authenticationService = new 
AuthenticationService(
                         PulsarConfigurationLoader.convertFrom(proxyConfig));
                @Cleanup
-               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService);
+               final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                               
proxyConfig.getBrokerClientAuthenticationParameters());
+               proxyClientAuthentication.start();
+               @Cleanup
+               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
 
                proxyService.start();
                final String proxyServiceUrl = proxyService.getServiceUrl();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 78ab9bd0d95..671e68e5c3f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -27,6 +27,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.limiter.ConnectionController;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -46,6 +48,7 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
     private final int NUM_CONCURRENT_INBOUND_CONNECTION = 4;
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -60,8 +63,11 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
         
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
         
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
         proxyConfig.setClusterName(configClusterName);
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -74,6 +80,9 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
     protected void cleanup() throws Exception {
         internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
index 5ddb084e3c7..6a3992c550f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.proxy.server;
 public class ProxyDisableZeroCopyTest extends ProxyTest {
 
     @Override
-    protected void initializeProxyConfig() {
+    protected void initializeProxyConfig() throws Exception {
         super.initializeProxyConfig();
         proxyConfig.setProxyZeroCopyModeEnabled(false);
     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 413774daf2c..40aa8f50405 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -22,6 +22,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -48,6 +50,7 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -62,8 +65,12 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
         proxyConfig.setHaProxyProtocolEnabled(true);
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -77,6 +84,9 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 5e969ca26e4..9c3a69b5f44 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -30,6 +30,8 @@ import lombok.Cleanup;
 
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -118,7 +120,11 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
 
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
-        try (ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService)) {
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        try (ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication)) {
             proxyService.start();
             try (PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) {
                 
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
@@ -134,7 +140,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
 
         @Cleanup
-        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService);
+        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
         proxyService.start();
 
         @Cleanup
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
index 90e15ede2f4..cf587015544 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
@@ -33,9 +33,12 @@ import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.Response;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.eclipse.jetty.client.HttpClient;
@@ -197,10 +200,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -226,10 +233,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         try {
             Response r1 = 
client.target(webServer.getServiceUri()).path("/server1/foobar").request().get();
@@ -257,10 +268,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
 
     }
 
@@ -276,10 +291,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -303,10 +322,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -329,10 +352,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get();
@@ -354,6 +381,10 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         StringBuilder longUri = new StringBuilder("/service3/tp");
         for (int i = 10 * 1024; i > 0; i = i - 11){
@@ -362,7 +393,7 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
 
         WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, 
authService);
         ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, 
proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServerMaxUriLen8k.start();
         try {
             Response r = 
client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get();
@@ -374,7 +405,7 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024);
         WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, 
authService);
         ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, 
proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServerMaxUriLen12k.start();
         try {
             Response r = 
client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get();
@@ -395,10 +426,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -427,10 +462,14 @@ public class ProxyIsAHttpProxyTest extends 
MockedPulsarServiceBaseTest {
         ProxyConfiguration proxyConfig = 
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
         AuthenticationService authService = new AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
 
         HttpClient httpClient = new HttpClient();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5671c527f68..8aa5581a0fe 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -24,6 +24,8 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -40,6 +42,7 @@ import org.testng.annotations.Test;
 public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest 
{
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod
@@ -87,9 +90,13 @@ public class ProxyKeyStoreTlsTransportTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
         proxyConfig.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -103,6 +110,9 @@ public class ProxyKeyStoreTlsTransportTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     protected PulsarClient newClient() throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
index 99fb8c03a81..2c6d080bf2c 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
@@ -33,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -54,6 +56,7 @@ import org.testng.annotations.Test;
 public class ProxyKeyStoreTlsWithAuthTest extends MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod
@@ -88,9 +91,13 @@ public class ProxyKeyStoreTlsWithAuthTest extends 
MockedPulsarServiceBaseTest {
         providers.add(AuthenticationProviderTls.class.getName());
         proxyConfig.setAuthenticationProviders(providers);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -104,6 +111,9 @@ public class ProxyKeyStoreTlsWithAuthTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     protected PulsarClient internalSetUpForClient(boolean addCertificates, 
String lookupUrl) throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
index 1dcebda7935..3a20273b8c0 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
@@ -29,6 +29,8 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -50,6 +52,7 @@ import org.testng.annotations.Test;
 public class ProxyKeyStoreTlsWithoutAuthTest extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod
@@ -76,8 +79,12 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends 
MockedPulsarServiceBaseTest
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -109,6 +116,9 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends 
MockedPulsarServiceBaseTest
     protected void cleanup() throws Exception {
         internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index a9017404d0e..4d12fdd77e7 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -31,6 +31,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -53,6 +55,7 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
     private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5;
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeMethod(alwaysRun = true)
@@ -69,7 +72,10 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
 
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -84,6 +90,9 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
         if (proxyService != null) {
             proxyService.close();
         }
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test(groups = "quarantine")
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index fae44c00ada..ab428c31b7f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -48,6 +50,7 @@ public class ProxyMutualTlsTest extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -68,8 +71,12 @@ public class ProxyMutualTlsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setTlsAllowInsecureConnection(false);
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -83,6 +90,9 @@ public class ProxyMutualTlsTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 1a9459619eb..583ab7000e5 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -62,6 +64,7 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -75,9 +78,12 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setClusterName(configClusterName);
         //enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -93,6 +99,9 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
index b692987d17a..4dd7bc981e5 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -42,6 +42,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.awaitility.Awaitility;
@@ -59,6 +61,7 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private WebServer proxyWebServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -72,8 +75,13 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(TEST_CLUSTER);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
-                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+                proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -86,7 +94,7 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
 
         proxyWebServer = new WebServer(proxyConfig, authService);
-        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         proxyWebServer.start();
     }
 
@@ -109,6 +117,9 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
         if (proxyService != null) {
             proxyService.close();
         }
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index d06cf4201ff..bdabfecaa43 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -35,6 +35,8 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -57,6 +59,7 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
 
     private ProxyService proxyService;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     protected void doInitConf() throws Exception {
@@ -127,9 +130,13 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
         properties.setProperty("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
         proxyConfig.setProperties(properties);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                 new AuthenticationService(
-                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                        PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
     }
 
     @AfterClass(alwaysRun = true)
@@ -137,6 +144,9 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     private void startProxy(boolean forwardAuthData) throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index a1ffc13ee93..883b725e15d 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import javax.naming.AuthenticationException;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -35,6 +36,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -219,9 +221,14 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         providers.add(BasicAuthenticationProvider.class.getName());
         proxyConfig.setAuthenticationProviders(providers);
 
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         try (ProxyService proxyService = new ProxyService(proxyConfig,
                 new AuthenticationService(
-                        PulsarConfigurationLoader.convertFrom(proxyConfig)))) {
+                        PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication)) {
             proxyService.start();
 
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 0b9b6f17d12..d96d2cd1f6e 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -20,16 +20,22 @@ package org.apache.pulsar.proxy.server;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Base64;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Future;
+import java.util.function.Consumer;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.eclipse.jetty.client.HttpClient;
@@ -160,4 +166,89 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testProxyClientAuthentication() throws Exception {
+        final Consumer<ProxyConfiguration> initConfig = (proxyConfig) -> {
+            proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+            proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+            proxyConfig.setWebServicePort(Optional.of(0));
+            proxyConfig.setServicePort(Optional.of(0));
+            proxyConfig.setWebSocketServiceEnabled(true);
+            proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+            proxyConfig.setClusterName(configClusterName);
+        };
+
+
+
+        ProxyServiceStarter serviceStarter = new ProxyServiceStarter(ARGS, 
null, true);
+        initConfig.accept(serviceStarter.getConfig());
+        // ProxyServiceStarter will throw an exception when 
Authentication#start is failed
+        
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication1.class.getName());
+        try {
+            serviceStarter.start();
+            fail("ProxyServiceStarter should throw an exception when 
Authentication#start is failed");
+        } catch (Exception ex) {
+            
assertTrue(ex.getMessage().contains("ExceptionAuthentication1#start"));
+            assertTrue(serviceStarter.getProxyClientAuthentication() 
instanceof ExceptionAuthentication1);
+        }
+
+        serviceStarter = new ProxyServiceStarter(ARGS, null, true);
+        initConfig.accept(serviceStarter.getConfig());
+        // ProxyServiceStarter will throw an exception when 
Authentication#start and Authentication#close are failed
+        
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication2.class.getName());
+        try {
+            serviceStarter.start();
+            fail("ProxyServiceStarter should throw an exception when 
Authentication#start and Authentication#close are failed");
+        } catch (Exception ex) {
+            
assertTrue(ex.getMessage().contains("ExceptionAuthentication2#start"));
+            assertTrue(serviceStarter.getProxyClientAuthentication() 
instanceof ExceptionAuthentication2);
+        }
+    }
+
+    public static class ExceptionAuthentication1 implements Authentication {
+
+        @Override
+        public String getAuthMethodName() {
+            return 
"org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication1";
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // no-op
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            throw new PulsarClientException("ExceptionAuthentication1#start");
+        }
+
+        @Override
+        public void close() throws IOException {
+            // no-op
+        }
+    }
+
+    public static class ExceptionAuthentication2 implements Authentication {
+
+        @Override
+        public String getAuthMethodName() {
+            return 
"org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication2";
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // no-op
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            throw new PulsarClientException("ExceptionAuthentication2#start");
+        }
+
+        @Override
+        public void close() throws IOException {
+            throw new IOException("ExceptionAuthentication2#close");
+        }
+    }
+
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 2866c6c2690..86d572702f3 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -38,6 +38,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -61,6 +63,7 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
     private ProxyService proxyService;
     private WebServer proxyWebServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -76,8 +79,12 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
         // enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.of(2));
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
-                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -90,7 +97,7 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
 
         proxyWebServer = new WebServer(proxyConfig, authService);
-        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         proxyWebServer.start();
     }
 
@@ -109,6 +116,9 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
         internalCleanup();
         proxyService.close();
         proxyWebServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 6e66008c15a..30c6e45654b 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -28,6 +28,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
@@ -56,6 +58,7 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig;
+    private Authentication proxyClientAuthentication;
     private SocatContainer socatContainer;
 
     private String brokerServiceUriSocat;
@@ -81,6 +84,10 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         startProxyService();
         // use the same port for subsequent restarts
         proxyConfig.setServicePort(proxyService.getListenPort());
@@ -88,7 +95,7 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
 
     private void startProxyService() throws Exception {
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))) {
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication) {
             @Override
             protected LookupProxyHandler newLookupProxyHandler(ProxyConnection 
proxyConnection) {
                 return new TestLookupProxyHandler(this, proxyConnection);
@@ -107,6 +114,9 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
         if (proxyService != null) {
             proxyService.close();
         }
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         if (socatContainer != null) {
             socatContainer.close();
         }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index e1e49f9e8c5..e101eb4ff7a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -38,6 +38,8 @@ import org.apache.avro.reflect.Nullable;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -74,6 +76,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
 
     protected ProxyService proxyService;
     protected ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    protected Authentication proxyClientAuthentication;
 
     @Data
     @ToString
@@ -94,7 +97,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         initializeProxyConfig();
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -102,12 +105,16 @@ public class ProxyTest extends 
MockedPulsarServiceBaseTest {
         proxyService.start();
     }
 
-    protected void initializeProxyConfig() {
+    protected void initializeProxyConfig() throws Exception {
         proxyConfig.setServicePort(Optional.ofNullable(0));
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(configClusterName);
+
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
     }
 
     @Override
@@ -116,6 +123,9 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 4e300d39741..0f0dc30b620 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -45,6 +47,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest 
{
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @Override
     @BeforeClass
@@ -63,8 +66,12 @@ public class ProxyTlsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -78,6 +85,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest 
{
         internalCleanup();
 
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
index 16f610d6d0a..42b5ae178d3 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
@@ -27,6 +27,8 @@ import java.util.Optional;
 import org.apache.pulsar.broker.auth.MockOIDCIdentityProvider;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.mockito.Mockito;
@@ -38,6 +40,7 @@ public class ProxyTlsWithAuthTest extends 
MockedPulsarServiceBaseTest {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     private MockOIDCIdentityProvider server;
 
@@ -75,8 +78,12 @@ public class ProxyTlsWithAuthTest extends 
MockedPulsarServiceBaseTest {
             " \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}");
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-            PulsarConfigurationLoader.convertFrom(proxyConfig))));
+            PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -89,6 +96,9 @@ public class ProxyTlsWithAuthTest extends 
MockedPulsarServiceBaseTest {
     protected void cleanup() throws Exception {
         internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         server.stop();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index cf9ad5831ec..92a54aa12fd 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -34,6 +34,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -73,6 +74,7 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @BeforeMethod
     @Override
@@ -138,7 +140,10 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
 
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication));
 
         proxyService.start();
     }
@@ -148,6 +153,9 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index bc96c7ea510..51f42ea0771 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -38,6 +38,7 @@ import 
org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -87,6 +88,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
     private ProxyService proxyService;
     private WebServer webServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @DataProvider(name = "hostnameVerification")
     public Object[][] hostnameVerificationCodecProvider() {
@@ -230,7 +232,10 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
 
         AuthenticationService authService =
                 new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, 
proxyClientAuthentication));
         proxyService.setGracefulShutdown(false);
         webServer = new WebServer(proxyConfig, authService);
     }
@@ -241,11 +246,14 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         super.internalCleanup();
         proxyService.close();
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     private void startProxy() throws Exception {
         proxyService.start();
-        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         webServer.start();
     }
 
@@ -459,10 +467,15 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setTlsProtocols(tlsProtocols);
         proxyConfig.setTlsCiphers(tlsCiphers);
 
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         @Cleanup
         ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                         new 
AuthenticationService(
-                                                                
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                                
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         proxyService.setGracefulShutdown(false);
         try {
             proxyService.start();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index d3c05fec721..3567c8264f1 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -49,6 +49,8 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -75,6 +77,7 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
 
     private static final int TEST_TIMEOUT_MS = 30_000;
 
+    private Authentication proxyClientAuthentication;
     private ProxyService proxyService;
 
     @Override
@@ -150,8 +153,11 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
     @BeforeMethod(alwaysRun = true)
     public void proxySetup() throws Exception {
         var proxyConfig = initializeProxyConfig();
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
                 .createConfigurationMetadataStore();
@@ -163,6 +169,9 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
         if (proxyService != null) {
             proxyService.close();
         }
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test(timeOut = TEST_TIMEOUT_MS)
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 5fb3e046824..63929ee72e4 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -39,6 +39,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -83,6 +84,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
     private ProxyService proxyService;
     private WebServer webServer;
     private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
     @BeforeMethod
     @Override
@@ -130,7 +132,10 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
 
         AuthenticationService authService =
                 new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig));
-        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService));
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, 
proxyClientAuthentication));
         webServer = new WebServer(proxyConfig, authService);
     }
 
@@ -140,11 +145,14 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         super.internalCleanup();
         proxyService.close();
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     private void startProxy() throws Exception {
         proxyService.start();
-        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null);
+        ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService, null, proxyClientAuthentication);
         webServer.start();
     }
 
@@ -425,7 +433,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
         final WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         @Cleanup
         final Client client = javax.ws.rs.client.ClientBuilder
@@ -450,7 +458,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setAuthenticateMetricsEndpoint(false);
         WebServer webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         @Cleanup
         Client client = javax.ws.rs.client.ClientBuilder.newClient(new 
ClientConfig().register(LoggingFeature.class));
@@ -463,7 +471,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setAuthenticateMetricsEndpoint(true);
         webServer = new WebServer(proxyConfig, authService);
         ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, 
proxyService,
-                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)));
+                registerCloseable(new BrokerDiscoveryProvider(proxyConfig, 
resource)), proxyClientAuthentication);
         webServer.start();
         try {
             Response r = 
client.target(webServer.getServiceUri()).path("/metrics").request().get();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 9d9490e74b5..885064b8e74 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -33,6 +33,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -57,6 +58,7 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
     private static final String CLUSTER_NAME = "without-service-discovery";
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
 
 
     @BeforeMethod
@@ -122,9 +124,13 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
 
         proxyConfig.setAuthenticationProviders(providers);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         proxyService = Mockito.spy(new ProxyService(proxyConfig,
                                                     new AuthenticationService(
-                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig)), 
proxyClientAuthentication));
 
         proxyService.start();
     }
@@ -134,6 +140,9 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
     protected void cleanup() throws Exception {
         super.internalCleanup();
         proxyService.close();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     /**
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index 57522186c8f..71025ed484f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -32,6 +32,8 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -47,6 +49,7 @@ import org.testng.annotations.Test;
 
 public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private WebServer webServer;
     private BrokerDiscoveryProvider discoveryProvider;
     private PulsarResources resource;
@@ -94,6 +97,10 @@ public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBas
         proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
         
proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         resource = new PulsarResources(registerCloseable(new 
ZKMetadataStore(mockZooKeeper)),
                 registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         webServer = new WebServer(proxyConfig, new AuthenticationService(
@@ -102,7 +109,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBas
         LoadManagerReport report = new LoadReport(brokerUrl.toString(), 
brokerUrlTls.toString(), null, null);
         doReturn(report).when(discoveryProvider).nextBroker();
 
-        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider));
+        ServletHolder servletHolder = new ServletHolder(new 
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 
@@ -114,6 +121,9 @@ public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBas
     @Override
     protected void cleanup() throws Exception {
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
         super.internalCleanup();
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index fe8b1f45385..0b597b93354 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -35,6 +35,8 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -49,6 +51,7 @@ import org.testng.annotations.Test;
 public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest 
{
     private final String STATUS_FILE_PATH = 
"./src/test/resources/vip_status.html";
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private Authentication proxyClientAuthentication;
     private WebServer webServer;
     private BrokerDiscoveryProvider discoveryProvider;
     private AdminProxyWrapper adminProxyHandler;
@@ -77,13 +80,17 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setClusterName(configClusterName);
 
+        proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           
PulsarConfigurationLoader.convertFrom(proxyConfig)));
 
         resource = new PulsarResources(registerCloseable(new 
ZKMetadataStore(mockZooKeeper)),
                 registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
         discoveryProvider = spy(registerCloseable(new 
BrokerDiscoveryProvider(proxyConfig, resource)));
-        adminProxyHandler = new AdminProxyWrapper(proxyConfig, 
discoveryProvider);
+        adminProxyHandler = new AdminProxyWrapper(proxyConfig, 
discoveryProvider, proxyClientAuthentication);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
@@ -101,6 +108,9 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     protected void cleanup() throws Exception {
         internalCleanup();
         webServer.stop();
+        if (proxyClientAuthentication != null) {
+            proxyClientAuthentication.close();
+        }
     }
 
     @Test
@@ -128,8 +138,8 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
     static class AdminProxyWrapper extends AdminProxyHandler {
         String rewrittenUrl;
 
-        AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider) {
-            super(config, discoveryProvider);
+        AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider 
discoveryProvider, Authentication proxyClientAuthentication) {
+            super(config, discoveryProvider, proxyClientAuthentication);
         }
 
         @Override

Reply via email to