This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b805a4ad45427a143d54013ad92035be06d8c641 Author: Yuri Mizushima <[email protected]> AuthorDate: Wed Aug 14 11:09:50 2024 +0900 [improve][proxy] Reuse authentication instance in pulsar-proxy (#23113) (cherry picked from commit 3e461c004ea229ef9b526a51fd0ed91e8157e873) --- .../ProxySaslAuthenticationTest.java | 6 +- .../pulsar/proxy/server/AdminProxyHandler.java | 23 ++---- .../pulsar/proxy/server/DirectProxyHandler.java | 4 +- .../apache/pulsar/proxy/server/ProxyService.java | 12 +-- .../pulsar/proxy/server/ProxyServiceStarter.java | 46 +++++++++-- .../extensions/SimpleProxyExtensionTestBase.java | 12 ++- .../server/AdminProxyHandlerKeystoreTLSTest.java | 25 ++++-- .../pulsar/proxy/server/AdminProxyHandlerTest.java | 3 +- .../proxy/server/AuthedAdminProxyHandlerTest.java | 15 +++- .../proxy/server/FunctionWorkerRoutingTest.java | 10 ++- .../InvalidProxyConfigForAuthorizationTest.java | 3 +- .../proxy/server/ProxyAdditionalServletTest.java | 55 +++++++------ .../ProxyAuthenticatedProducerConsumerTest.java | 11 ++- .../proxy/server/ProxyAuthenticationTest.java | 7 +- .../server/ProxyConnectionThrottlingTest.java | 11 ++- .../proxy/server/ProxyDisableZeroCopyTest.java | 25 +----- .../server/ProxyEnableHAProxyProtocolTest.java | 12 ++- .../proxy/server/ProxyForwardAuthDataTest.java | 10 ++- .../pulsar/proxy/server/ProxyIsAHttpProxyTest.java | 60 +++++++++++--- .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java | 12 ++- .../server/ProxyKeyStoreTlsTestWithoutAuth.java | 12 ++- .../server/ProxyKeyStoreTlsTransportTest.java | 12 ++- .../proxy/server/ProxyLookupThrottlingTest.java | 14 +++- .../pulsar/proxy/server/ProxyMutualTlsTest.java | 12 ++- .../pulsar/proxy/server/ProxyParserTest.java | 11 ++- .../proxy/server/ProxyPrometheusMetricsTest.java | 15 +++- .../pulsar/proxy/server/ProxyRefreshAuthTest.java | 12 ++- .../proxy/server/ProxyRolesEnforcementTest.java | 9 ++- .../proxy/server/ProxyServiceStarterTest.java | 91 ++++++++++++++++++++++ .../apache/pulsar/proxy/server/ProxyStatsTest.java | 14 +++- .../proxy/server/ProxyStuckConnectionTest.java | 12 ++- .../org/apache/pulsar/proxy/server/ProxyTest.java | 28 +++++-- .../apache/pulsar/proxy/server/ProxyTlsTest.java | 12 ++- .../pulsar/proxy/server/ProxyTlsTestWithAuth.java | 12 ++- .../server/ProxyWithAuthorizationNegTest.java | 10 ++- .../proxy/server/ProxyWithAuthorizationTest.java | 21 ++++- .../server/ProxyWithJwtAuthorizationTest.java | 28 +++++-- .../server/ProxyWithoutServiceDiscoveryTest.java | 11 ++- .../SuperUserAuthedAdminProxyHandlerTest.java | 15 +++- .../server/UnauthedAdminProxyHandlerTest.java | 16 +++- 40 files changed, 570 insertions(+), 159 deletions(-) diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java index f0e45aa734a..c4ccc31142c 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java @@ -258,7 +258,11 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase { proxyConfig.setForwardAuthorizationCredentials(true); AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); proxyService.start(); final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index caaa99c5d40..0108b770249 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; import javax.net.ssl.SSLContext; @@ -40,7 +39,6 @@ import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.KeyStoreParams; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.SecurityUtility; @@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet { private final ProxyConfiguration config; private final BrokerDiscoveryProvider discoveryProvider; + private final Authentication proxyClientAuthentication; private final String brokerWebServiceUrl; private final String functionWorkerWebServiceUrl; - AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) { + AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider, + Authentication proxyClientAuthentication) { this.config = config; this.discoveryProvider = discoveryProvider; + this.proxyClientAuthentication = proxyClientAuthentication; this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS() : config.getBrokerWebServiceURL(); this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS() @@ -256,22 +257,13 @@ class AdminProxyHandler extends ProxyServlet { @Override protected HttpClient newHttpClient() { try { - Authentication auth = AuthenticationFactory.create( - config.getBrokerClientAuthenticationPlugin(), - config.getBrokerClientAuthenticationParameters() - ); - - Objects.requireNonNull(auth, "No supported auth found for proxy"); - - auth.start(); - if (config.isTlsEnabledWithBroker()) { try { X509Certificate[] trustCertificates = SecurityUtility .loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath()); SSLContext sslCtx; - AuthenticationDataProvider authData = auth.getAuthData(); + AuthenticationDataProvider authData = proxyClientAuthentication.getAuthData(); if (config.isBrokerClientTlsEnabledWithKeyStore()) { KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null; sslCtx = KeyStoreSSLContext.createClientSslContext( @@ -311,11 +303,6 @@ class AdminProxyHandler extends ProxyServlet { return new JettyHttpClient(contextFactory); } catch (Exception e) { LOG.error("new jetty http client exception ", e); - try { - auth.close(); - } catch (IOException ioe) { - LOG.error("Failed to close the authentication service", ioe); - } throw new PulsarClientException.InvalidConfigurationException(e.getMessage()); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index d63b04b6734..4678db82c6e 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -52,7 +52,6 @@ import lombok.Getter; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; @@ -114,8 +113,7 @@ public class DirectProxyHandler { if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) { try { - authData = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(), - config.getBrokerClientAuthenticationParameters()).getAuthData(); + authData = authentication.getAuthData(); } catch (PulsarClientException e) { throw new RuntimeException(e); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 64885413320..6ee9f5bcdfd 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -63,8 +63,6 @@ import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.DnsResolverUtil; @@ -152,7 +150,8 @@ public class ProxyService implements Closeable { private final ConnectionController connectionController; public ProxyService(ProxyConfiguration proxyConfig, - AuthenticationService authenticationService) throws Exception { + AuthenticationService authenticationService, + Authentication proxyClientAuthentication) throws Exception { requireNonNull(proxyConfig); this.proxyConfig = proxyConfig; this.clientCnxs = Sets.newConcurrentHashSet(); @@ -201,12 +200,7 @@ public class ProxyService implements Closeable { }); }, 60, TimeUnit.SECONDS); this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig); - if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { - proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters()); - } else { - proxyClientAuthentication = AuthenticationDisabled.INSTANCE; - } + this.proxyClientAuthentication = proxyClientAuthentication; this.connectionController = new ConnectionController.DefaultConnectionController( proxyConfig.getMaxConcurrentInboundConnections(), proxyConfig.getMaxConcurrentInboundConnectionsPerIp()); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index cba7b4b6ad5..19811c90a4f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -31,11 +31,13 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import io.prometheus.client.Gauge.Child; import io.prometheus.client.hotspot.DefaultExports; +import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Objects; import java.util.function.Consumer; import lombok.Getter; import org.apache.logging.log4j.LogManager; @@ -45,6 +47,10 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.policies.data.ClusterData; @@ -101,6 +107,9 @@ public class ProxyServiceStarter { private ProxyConfiguration config; + @Getter + private Authentication proxyClientAuthentication; + @Getter private ProxyService proxyService; @@ -241,8 +250,27 @@ public class ProxyServiceStarter { public void start() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(config)); + + if (config.getBrokerClientAuthenticationPlugin() != null) { + proxyClientAuthentication = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(), + config.getBrokerClientAuthenticationParameters()); + Objects.requireNonNull(proxyClientAuthentication, "No supported auth found for proxy"); + try { + proxyClientAuthentication.start(); + } catch (Exception e) { + try { + proxyClientAuthentication.close(); + } catch (IOException ioe) { + log.error("Failed to close the authentication service", ioe); + } + throw new PulsarClientException.InvalidConfigurationException(e.getMessage()); + } + } else { + proxyClientAuthentication = AuthenticationDisabled.INSTANCE; + } + // create proxy service - proxyService = new ProxyService(config, authenticationService); + proxyService = new ProxyService(config, authenticationService, proxyClientAuthentication); // create a web-service server = new WebServer(config, authenticationService); @@ -289,7 +317,8 @@ public class ProxyServiceStarter { metricsInitialized = true; } - addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider()); + addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), + proxyClientAuthentication); // start web-service server.start(); @@ -303,6 +332,9 @@ public class ProxyServiceStarter { if (server != null) { server.stop(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } catch (Exception e) { log.warn("server couldn't stop gracefully {}", e.getMessage(), e); } finally { @@ -313,9 +345,10 @@ public class ProxyServiceStarter { } public static void addWebServerHandlers(WebServer server, - ProxyConfiguration config, - ProxyService service, - BrokerDiscoveryProvider discoveryProvider) throws Exception { + ProxyConfiguration config, + ProxyService service, + BrokerDiscoveryProvider discoveryProvider, + Authentication proxyClientAuthentication) throws Exception { // We can make 'status.html' publicly accessible without authentication since // it does not contain any sensitive data. server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), @@ -332,7 +365,8 @@ public class ProxyServiceStarter { } } - AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider); + AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider, + proxyClientAuthentication); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); server.addServlet("/admin", servletHolder); server.addServlet("/lookup", servletHolder); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index 79662097c3b..4f5436dccd6 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -26,6 +26,8 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -121,6 +123,7 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa private ProxyService proxyService; private boolean useSeparateThreadPoolForProxyExtensions; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) { this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions; @@ -141,8 +144,12 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -172,6 +179,9 @@ public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBa protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } if (tempDirectory != null) { FileUtils.deleteDirectory(tempDirectory); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java index d6796b7eaa6..b4be7bebb83 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java @@ -18,11 +18,18 @@ */ package org.apache.pulsar.proxy.server; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -34,18 +41,13 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTest { private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; + private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; @@ -103,12 +105,16 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); - ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider)); + ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication)); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); webServer.start(); @@ -118,6 +124,9 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes @Override protected void cleanup() throws Exception { webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } super.internalCleanup(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java index becebe0059e..4f925618e8a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java @@ -32,6 +32,7 @@ import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.pulsar.client.api.Authentication; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.testng.Assert; @@ -46,7 +47,7 @@ public class AdminProxyHandlerTest { // given HttpClient httpClient = mock(HttpClient.class); adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class), - mock(BrokerDiscoveryProvider.class)) { + mock(BrokerDiscoveryProvider.class), mock(Authentication.class)) { @Override protected HttpClient createHttpClient() throws ServletException { return httpClient; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index af70276aed9..840bfe17fe4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -20,18 +20,17 @@ package org.apache.pulsar.proxy.server; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; - import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; - import java.util.Optional; - import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -51,6 +50,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { private static final Logger LOG = LoggerFactory.getLogger(AuthedAdminProxyHandlerTest.class); private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; private PulsarResources resource; @@ -100,13 +100,17 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); - ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider)); + ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication)); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); @@ -118,6 +122,9 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { @Override protected void cleanup() throws Exception { webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } super.internalCleanup(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java index db5e9e12bd2..a07a0f082d3 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.proxy.server; +import lombok.Cleanup; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.testng.Assert; import org.testng.annotations.Test; @@ -37,8 +40,13 @@ public class FunctionWorkerRoutingTest { proxyConfig.setBrokerWebServiceURL(brokerUrl); proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + BrokerDiscoveryProvider discoveryProvider = mock(BrokerDiscoveryProvider.class); - AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, discoveryProvider); + AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication); String funcUrl = handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test")); Assert.assertEquals(funcUrl, String.format("%s/admin/v3/functions/%s/%s", diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java index c29bfaa9648..b7ef0855e38 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -33,7 +34,7 @@ public class InvalidProxyConfigForAuthorizationTest { proxyConfiguration.setAuthorizationEnabled(true); proxyConfiguration.setAuthenticationEnabled(false); try (ProxyService proxyService = new ProxyService(proxyConfiguration, - Mockito.mock(AuthenticationService.class))) { + Mockito.mock(AuthenticationService.class), Mockito.mock(Authentication.class))) { proxyService.start(); fail("An exception should have been thrown"); } catch (Exception e) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java index 17cd3c33e79..67cf5aca911 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java @@ -18,18 +18,36 @@ */ package org.apache.pulsar.proxy.server; +import static org.mockito.Mockito.doReturn; +import static org.testng.Assert.assertEquals; import com.google.common.collect.Sets; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import javax.servlet.Servlet; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import okhttp3.Response; import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; -import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; -import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.servlet.ServletHolder; import org.mockito.Mockito; @@ -38,24 +56,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import javax.servlet.Servlet; -import javax.servlet.ServletConfig; -import javax.servlet.ServletException; -import javax.servlet.ServletOutputStream; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -import static org.mockito.Mockito.doReturn; -import static org.testng.Assert.assertEquals; - @Slf4j public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest { @@ -65,6 +65,7 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private WebServer proxyWebServer; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -82,8 +83,13 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest { // this is for nar package test // addServletNar(); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), + proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -97,7 +103,7 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest { mockAdditionalServlet(); proxyWebServer = new WebServer(proxyConfig, authService); - ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null, proxyClientAuthentication); proxyWebServer.start(); } @@ -177,6 +183,9 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index bfe86f86976..ab97bf05201 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -74,6 +75,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private final String configClusterName = "test"; @BeforeMethod @@ -138,8 +140,12 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); proxyService.start(); @@ -150,6 +156,9 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 9c8e5197adf..3207c2c3d6a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -233,7 +234,11 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); @Cleanup - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + @Cleanup + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); proxyService.start(); final String proxyServiceUrl = proxyService.getServiceUrl(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 336f11ae19d..5d950d847ef 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -27,6 +27,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.limiter.ConnectionController; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -46,6 +48,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { private final int NUM_CONCURRENT_INBOUND_CONNECTION = 4; private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -59,8 +62,11 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -72,6 +78,9 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java index 3aa71413d54..6a3992c550f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java @@ -18,32 +18,11 @@ */ package org.apache.pulsar.proxy.server; -import static org.mockito.Mockito.doReturn; -import java.util.Optional; -import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; -import org.apache.pulsar.metadata.impl.ZKMetadataStore; -import org.mockito.Mockito; -import org.testng.annotations.BeforeClass; - public class ProxyDisableZeroCopyTest extends ProxyTest { @Override - @BeforeClass - protected void setup() throws Exception { - internalSetup(); - - proxyConfig.setServicePort(Optional.ofNullable(0)); - proxyConfig.setBrokerProxyAllowedTargetPorts("*"); - proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); - proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + protected void initializeProxyConfig() throws Exception { + super.initializeProxyConfig(); proxyConfig.setProxyZeroCopyModeEnabled(false); - - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); - - proxyService.start(); } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 8b3092c6f51..77212c9535b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -22,6 +22,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -48,6 +50,7 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -61,8 +64,12 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setHaProxyProtocolEnabled(true); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -75,6 +82,9 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index b7cfb874747..8e3a2fdf264 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -30,6 +30,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -116,7 +118,11 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - try (ProxyService proxyService = new ProxyService(proxyConfig, authenticationService)) { + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + try (ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication)) { proxyService.start(); try (PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) { proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); @@ -132,7 +138,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { PulsarConfigurationLoader.convertFrom(proxyConfig)); @Cleanup - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); proxyService.start(); @Cleanup diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java index 246dd9f85e3..be51bd18323 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java @@ -36,10 +36,12 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.Response; - +import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.eclipse.jetty.client.HttpClient; @@ -201,10 +203,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -230,10 +236,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); try { Response r1 = client.target(webServer.getServiceUri()).path("/server1/foobar").request().get(); @@ -261,10 +271,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); } @@ -280,10 +294,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -307,10 +325,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -333,10 +355,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get(); @@ -358,6 +384,10 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); StringBuilder longUri = new StringBuilder("/service3/tp"); for (int i = 10 * 1024; i > 0; i = i - 11){ @@ -366,7 +396,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServerMaxUriLen8k.start(); try { Response r = client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get(); @@ -378,7 +408,7 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024); WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServerMaxUriLen12k.start(); try { Response r = client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get(); @@ -399,10 +429,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -431,10 +465,14 @@ public class ProxyIsAHttpProxyTest extends MockedPulsarServiceBaseTest { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); HttpClient httpClient = new HttpClient(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java index 88e7b269d6e..d22c64f977f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java @@ -33,6 +33,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -54,6 +56,7 @@ import org.testng.annotations.Test; public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod @@ -87,9 +90,13 @@ public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest { providers.add(AuthenticationProviderTls.class.getName()); proxyConfig.setAuthenticationProviders(providers); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -102,6 +109,9 @@ public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } protected PulsarClient internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java index 5feef74e3b9..050b230ba4d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java @@ -29,6 +29,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -50,6 +52,7 @@ import org.testng.annotations.Test; public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod @@ -75,8 +78,12 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -107,6 +114,9 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java index 5c4e40ed65a..2a9bc720b79 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java @@ -24,6 +24,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -40,6 +42,7 @@ import org.testng.annotations.Test; public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod @@ -86,9 +89,13 @@ public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest { proxyConfig.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH); proxyConfig.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -101,6 +108,9 @@ public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } protected PulsarClient newClient() throws Exception { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 1b63aa14dfe..9348a4df4db 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -21,16 +21,15 @@ package org.apache.pulsar.proxy.server; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import java.util.Optional; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; - import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -53,6 +52,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5; private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod(alwaysRun = true) @@ -68,7 +68,10 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -82,6 +85,9 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { if (proxyService != null) { proxyService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test(groups = "quarantine") diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java index ad237c25397..a73b2d388f8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -48,6 +50,7 @@ public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -67,8 +70,12 @@ public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest { proxyConfig.setTlsRequireTrustedClientCertOnConnect(true); proxyConfig.setTlsAllowInsecureConnection(false); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -81,6 +88,9 @@ public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index 82cd702aa7f..74a4435b768 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -64,6 +66,7 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -76,9 +79,12 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); //enable full parsing feature proxyConfig.setProxyLogLevel(Optional.ofNullable(2)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -93,6 +99,9 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java index 6948996ad46..69dcda2a4d7 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java @@ -42,6 +42,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.awaitility.Awaitility; @@ -59,6 +61,7 @@ public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private WebServer proxyWebServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -72,8 +75,13 @@ public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(TEST_CLUSTER); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), + proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -85,7 +93,7 @@ public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest { PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyWebServer = new WebServer(proxyConfig, authService); - ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null, proxyClientAuthentication); proxyWebServer.start(); } @@ -108,6 +116,9 @@ public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest { if (proxyService != null) { proxyService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java index 2f36cc679f1..b058e4af830 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java @@ -35,6 +35,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ClientCnx; @@ -56,6 +58,7 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase { private ProxyService proxyService; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override protected void doInitConf() throws Exception { @@ -125,9 +128,13 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase { properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); proxyConfig.setProperties(properties); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); } @AfterClass(alwaysRun = true) @@ -135,6 +142,9 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } private void startProxy(boolean forwardAuthData) throws Exception { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 3259cfd95c7..e32f79b0f0f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import javax.naming.AuthenticationException; +import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; @@ -35,6 +36,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -217,9 +219,14 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase { providers.add(BasicAuthenticationProvider.class.getName()); proxyConfig.setAuthenticationProviders(providers); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + try (ProxyService proxyService = new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))) { + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)) { proxyService.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 759eabdb5a2..e12415b0d91 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -20,16 +20,22 @@ package org.apache.pulsar.proxy.server; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import java.util.function.Consumer; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ProducerMessage; import org.eclipse.jetty.client.HttpClient; @@ -155,4 +161,89 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { } } + @Test + public void testProxyClientAuthentication() throws Exception { + final Consumer<ProxyConfiguration> initConfig = (proxyConfig) -> { + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress()); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebSocketServiceEnabled(true); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setClusterName(configClusterName); + }; + + + + ProxyServiceStarter serviceStarter = new ProxyServiceStarter(ARGS, null, true); + initConfig.accept(serviceStarter.getConfig()); + // ProxyServiceStarter will throw an exception when Authentication#start is failed + serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication1.class.getName()); + try { + serviceStarter.start(); + fail("ProxyServiceStarter should throw an exception when Authentication#start is failed"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("ExceptionAuthentication1#start")); + assertTrue(serviceStarter.getProxyClientAuthentication() instanceof ExceptionAuthentication1); + } + + serviceStarter = new ProxyServiceStarter(ARGS, null, true); + initConfig.accept(serviceStarter.getConfig()); + // ProxyServiceStarter will throw an exception when Authentication#start and Authentication#close are failed + serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication2.class.getName()); + try { + serviceStarter.start(); + fail("ProxyServiceStarter should throw an exception when Authentication#start and Authentication#close are failed"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("ExceptionAuthentication2#start")); + assertTrue(serviceStarter.getProxyClientAuthentication() instanceof ExceptionAuthentication2); + } + } + + public static class ExceptionAuthentication1 implements Authentication { + + @Override + public String getAuthMethodName() { + return "org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication1"; + } + + @Override + public void configure(Map<String, String> authParams) { + // no-op + } + + @Override + public void start() throws PulsarClientException { + throw new PulsarClientException("ExceptionAuthentication1#start"); + } + + @Override + public void close() throws IOException { + // no-op + } + } + + public static class ExceptionAuthentication2 implements Authentication { + + @Override + public String getAuthMethodName() { + return "org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication2"; + } + + @Override + public void configure(Map<String, String> authParams) { + // no-op + } + + @Override + public void start() throws PulsarClientException { + throw new PulsarClientException("ExceptionAuthentication2#start"); + } + + @Override + public void close() throws IOException { + throw new IOException("ExceptionAuthentication2#close"); + } + } + } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java index 140af88aae7..5769571d187 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java @@ -38,6 +38,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -61,6 +63,7 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private WebServer proxyWebServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -75,8 +78,12 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest { // enable full parsing feature proxyConfig.setProxyLogLevel(Optional.of(2)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -88,7 +95,7 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest { PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyWebServer = new WebServer(proxyConfig, authService); - ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null, proxyClientAuthentication); proxyWebServer.start(); } @@ -106,6 +113,9 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest { protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java index 97279659af6..5052f598dba 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java @@ -28,6 +28,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Message; @@ -56,6 +58,7 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig; + private Authentication proxyClientAuthentication; private SocatContainer socatContainer; private String brokerServiceUriSocat; @@ -80,6 +83,10 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest { proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + startProxyService(); // use the same port for subsequent restarts proxyConfig.setServicePort(proxyService.getListenPort()); @@ -87,7 +94,7 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest { private void startProxyService() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig))) { + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication) { @Override protected LookupProxyHandler newLookupProxyHandler(ProxyConnection proxyConnection) { return new TestLookupProxyHandler(this, proxyConnection); @@ -105,6 +112,9 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest { if (proxyService != null) { proxyService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } if (socatContainer != null) { socatContainer.close(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index e799e2e948a..6fab7d6af21 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -38,6 +38,8 @@ import org.apache.avro.reflect.Nullable; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -73,6 +75,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { protected ProxyService proxyService; protected ProxyConfiguration proxyConfig = new ProxyConfiguration(); + protected Authentication proxyClientAuthentication; @Data @ToString @@ -90,25 +93,38 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { protected void setup() throws Exception { internalSetup(); - proxyConfig.setServicePort(Optional.ofNullable(0)); - proxyConfig.setBrokerProxyAllowedTargetPorts("*"); - proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); - proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + initializeProxyConfig(); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService) + .createConfigurationMetadataStore(); proxyService.start(); } + protected void initializeProxyConfig() throws Exception { + proxyConfig.setServicePort(Optional.ofNullable(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); + proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(configClusterName); + + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + } + @Override @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index 64b0cd6b1a6..79a55dea674 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -45,6 +47,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -62,8 +65,12 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -76,6 +83,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java index 0f1fa74a209..2a68d92c9e1 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java @@ -27,6 +27,8 @@ import java.util.Optional; import org.apache.pulsar.broker.auth.MockOIDCIdentityProvider; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -38,6 +40,7 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private MockOIDCIdentityProvider server; @@ -74,8 +77,12 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest { " \"audience\": \"an-audience\"," + " \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}"); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -87,6 +94,9 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest { protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } server.stop(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index 2d97a4b06a8..b10fb221600 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -72,6 +73,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @BeforeMethod @Override @@ -136,7 +138,10 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication)); proxyService.start(); } @@ -146,6 +151,9 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index 31757cc0367..9087e8f477e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -86,6 +87,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { private ProxyService proxyService; private WebServer webServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @DataProvider(name = "hostnameVerification") public Object[][] hostnameVerificationCodecProvider() { @@ -228,7 +230,10 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { AuthenticationService authService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, proxyClientAuthentication)); webServer = new WebServer(proxyConfig, authService); } @@ -238,11 +243,14 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { super.internalCleanup(); proxyService.close(); webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } private void startProxy() throws Exception { proxyService.start(); - ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null, proxyClientAuthentication); webServer.start(); } @@ -415,7 +423,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { * This test verifies whether the Client and Proxy honor the protocols and ciphers specified. Details description of * test cases can be found in protocolsCiphersProviderCodecProvider */ - @Test(dataProvider = "protocolsCiphersProvider", timeOut = 5000) + @Test(dataProvider = "protocolsCiphersProvider", timeOut = 10000) public void tlsCiphersAndProtocols(Set<String> tlsCiphers, Set<String> tlsProtocols, boolean expectFailure) throws Exception { log.info("-- Starting {} test --", methodName); @@ -455,9 +463,14 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { proxyConfig.setTlsProtocols(tlsProtocols); proxyConfig.setTlsCiphers(tlsCiphers); + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + + @Cleanup ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); try { proxyService.start(); } catch (Exception ex) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index 88ecfe8a318..6b1890e2326 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -39,7 +39,16 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; @@ -74,6 +83,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { private ProxyService proxyService; private WebServer webServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @BeforeMethod @Override @@ -120,7 +130,10 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { AuthenticationService authService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, proxyClientAuthentication)); webServer = new WebServer(proxyConfig, authService); } @@ -130,11 +143,14 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { super.internalCleanup(); proxyService.close(); webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } private void startProxy() throws Exception { proxyService.start(); - ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null, proxyClientAuthentication); webServer.start(); } @@ -415,7 +431,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { PulsarConfigurationLoader.convertFrom(proxyConfig)); final WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); @Cleanup final Client client = javax.ws.rs.client.ClientBuilder @@ -440,7 +456,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { proxyConfig.setAuthenticateMetricsEndpoint(false); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); @Cleanup Client client = javax.ws.rs.client.ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); @@ -453,7 +469,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { proxyConfig.setAuthenticateMetricsEndpoint(true); webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, - new BrokerDiscoveryProvider(proxyConfig, resource)); + new BrokerDiscoveryProvider(proxyConfig, resource), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/metrics").request().get(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index ec1412b021d..23fdd47c280 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -56,6 +57,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class); private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @BeforeMethod @Override @@ -122,9 +124,13 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { proxyConfig.setAuthenticationProviders(providers); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); proxyService.start(); } @@ -134,6 +140,9 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index d3291c8fb91..0d510281134 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -20,18 +20,17 @@ package org.apache.pulsar.proxy.server; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; - import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; - import java.util.Optional; - import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -47,6 +46,7 @@ import org.testng.annotations.Test; public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; private PulsarResources resource; @@ -95,13 +95,17 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); - ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider)); + ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication)); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); @@ -113,6 +117,9 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas @Override protected void cleanup() throws Exception { webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } super.internalCleanup(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index aa4aeaa2ea8..78ac1e46a15 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -35,6 +35,8 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -49,6 +51,7 @@ import org.testng.annotations.Test; public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { private final String STATUS_FILE_PATH = "./src/test/resources/vip_status.html"; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; private AdminProxyWrapper adminProxyHandler; @@ -76,13 +79,17 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); - adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider); + adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider, proxyClientAuthentication); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); @@ -100,6 +107,9 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { protected void cleanup() throws Exception { internalCleanup(); webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test @@ -126,8 +136,8 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { static class AdminProxyWrapper extends AdminProxyHandler { String rewrittenUrl; - AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) { - super(config, discoveryProvider); + AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider, Authentication proxyClientAuthentication) { + super(config, discoveryProvider, proxyClientAuthentication); } @Override
