This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d5833f98f7f [improve][proxy] Reuse authentication instance in
pulsar-proxy (#23113)
d5833f98f7f is described below
commit d5833f98f7f5456ea58b42c1803783d03da1ca1d
Author: Yuri Mizushima <[email protected]>
AuthorDate: Wed Aug 14 11:09:50 2024 +0900
[improve][proxy] Reuse authentication instance in pulsar-proxy (#23113)
---
.../ProxySaslAuthenticationTest.java | 6 +-
.../pulsar/proxy/server/AdminProxyHandler.java | 23 ++----
.../pulsar/proxy/server/DirectProxyHandler.java | 4 +-
.../apache/pulsar/proxy/server/ProxyService.java | 12 +--
.../pulsar/proxy/server/ProxyServiceStarter.java | 47 +++++++++--
.../extensions/SimpleProxyExtensionTestBase.java | 12 ++-
.../server/AdminProxyHandlerKeystoreTLSTest.java | 13 +++-
.../pulsar/proxy/server/AdminProxyHandlerTest.java | 3 +-
.../proxy/server/AuthedAdminProxyHandlerTest.java | 12 ++-
.../proxy/server/FunctionWorkerRoutingTest.java | 10 ++-
.../InvalidProxyConfigForAuthorizationTest.java | 3 +-
.../proxy/server/ProxyAdditionalServletTest.java | 15 +++-
.../ProxyAuthenticatedProducerConsumerTest.java | 11 ++-
.../proxy/server/ProxyAuthenticationTest.java | 7 +-
.../server/ProxyConnectionThrottlingTest.java | 11 ++-
.../proxy/server/ProxyDisableZeroCopyTest.java | 2 +-
.../server/ProxyEnableHAProxyProtocolTest.java | 12 ++-
.../proxy/server/ProxyForwardAuthDataTest.java | 10 ++-
.../pulsar/proxy/server/ProxyIsAHttpProxyTest.java | 59 +++++++++++---
.../server/ProxyKeyStoreTlsTransportTest.java | 12 ++-
.../proxy/server/ProxyKeyStoreTlsWithAuthTest.java | 12 ++-
.../server/ProxyKeyStoreTlsWithoutAuthTest.java | 12 ++-
.../proxy/server/ProxyLookupThrottlingTest.java | 11 ++-
.../pulsar/proxy/server/ProxyMutualTlsTest.java | 12 ++-
.../pulsar/proxy/server/ProxyParserTest.java | 11 ++-
.../proxy/server/ProxyPrometheusMetricsTest.java | 15 +++-
.../pulsar/proxy/server/ProxyRefreshAuthTest.java | 12 ++-
.../proxy/server/ProxyRolesEnforcementTest.java | 9 ++-
.../proxy/server/ProxyServiceStarterTest.java | 91 ++++++++++++++++++++++
.../apache/pulsar/proxy/server/ProxyStatsTest.java | 14 +++-
.../proxy/server/ProxyStuckConnectionTest.java | 12 ++-
.../org/apache/pulsar/proxy/server/ProxyTest.java | 14 +++-
.../apache/pulsar/proxy/server/ProxyTlsTest.java | 12 ++-
.../pulsar/proxy/server/ProxyTlsWithAuthTest.java | 12 ++-
.../server/ProxyWithAuthorizationNegTest.java | 10 ++-
.../proxy/server/ProxyWithAuthorizationTest.java | 19 ++++-
.../server/ProxyWithExtensibleLoadManagerTest.java | 11 ++-
.../server/ProxyWithJwtAuthorizationTest.java | 18 +++--
.../server/ProxyWithoutServiceDiscoveryTest.java | 11 ++-
.../SuperUserAuthedAdminProxyHandlerTest.java | 12 ++-
.../server/UnauthedAdminProxyHandlerTest.java | 16 +++-
41 files changed, 536 insertions(+), 94 deletions(-)
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index a27384c9890..ca28befabc1 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -260,7 +260,11 @@ public class ProxySaslAuthenticationTest extends
ProducerConsumerBase {
proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
- ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService);
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+
proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
proxyService.start();
final String proxyServiceUrl = "pulsar://localhost:" +
proxyService.getListenPort().get();
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index caaa99c5d40..0108b770249 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
@@ -40,7 +39,6 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.SecurityUtility;
@@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet {
private final ProxyConfiguration config;
private final BrokerDiscoveryProvider discoveryProvider;
+ private final Authentication proxyClientAuthentication;
private final String brokerWebServiceUrl;
private final String functionWorkerWebServiceUrl;
- AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider
discoveryProvider) {
+ AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider
discoveryProvider,
+ Authentication proxyClientAuthentication) {
this.config = config;
this.discoveryProvider = discoveryProvider;
+ this.proxyClientAuthentication = proxyClientAuthentication;
this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ?
config.getBrokerWebServiceURLTLS()
: config.getBrokerWebServiceURL();
this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ?
config.getFunctionWorkerWebServiceURLTLS()
@@ -256,22 +257,13 @@ class AdminProxyHandler extends ProxyServlet {
@Override
protected HttpClient newHttpClient() {
try {
- Authentication auth = AuthenticationFactory.create(
- config.getBrokerClientAuthenticationPlugin(),
- config.getBrokerClientAuthenticationParameters()
- );
-
- Objects.requireNonNull(auth, "No supported auth found for proxy");
-
- auth.start();
-
if (config.isTlsEnabledWithBroker()) {
try {
X509Certificate[] trustCertificates = SecurityUtility
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());
SSLContext sslCtx;
- AuthenticationDataProvider authData = auth.getAuthData();
+ AuthenticationDataProvider authData =
proxyClientAuthentication.getAuthData();
if (config.isBrokerClientTlsEnabledWithKeyStore()) {
KeyStoreParams params = authData.hasDataForTls() ?
authData.getTlsKeyStoreParams() : null;
sslCtx = KeyStoreSSLContext.createClientSslContext(
@@ -311,11 +303,6 @@ class AdminProxyHandler extends ProxyServlet {
return new JettyHttpClient(contextFactory);
} catch (Exception e) {
LOG.error("new jetty http client exception ", e);
- try {
- auth.close();
- } catch (IOException ioe) {
- LOG.error("Failed to close the authentication
service", ioe);
- }
throw new
PulsarClientException.InvalidConfigurationException(e.getMessage());
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index d63b04b6734..4678db82c6e 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -52,7 +52,6 @@ import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
@@ -114,8 +113,7 @@ public class DirectProxyHandler {
if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
try {
- authData =
AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
-
config.getBrokerClientAuthenticationParameters()).getAuthData();
+ authData = authentication.getAuthData();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index ea9e4ebfaa9..5cf01d6668b 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -64,8 +64,6 @@ import
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -158,7 +156,8 @@ public class ProxyService implements Closeable {
private boolean gracefulShutdown = true;
public ProxyService(ProxyConfiguration proxyConfig,
- AuthenticationService authenticationService) throws
Exception {
+ AuthenticationService authenticationService,
+ Authentication proxyClientAuthentication) throws
Exception {
requireNonNull(proxyConfig);
this.proxyConfig = proxyConfig;
this.clientCnxs = Sets.newConcurrentHashSet();
@@ -207,12 +206,7 @@ public class ProxyService implements Closeable {
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
- if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
- proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
- proxyConfig.getBrokerClientAuthenticationParameters());
- } else {
- proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
- }
+ this.proxyClientAuthentication = proxyClientAuthentication;
this.connectionController = new
ConnectionController.DefaultConnectionController(
proxyConfig.getMaxConcurrentInboundConnections(),
proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 10121e7f5d6..a5504cac100 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -29,11 +29,13 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
import io.prometheus.client.hotspot.DefaultExports;
+import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Getter;
@@ -44,6 +46,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -104,6 +110,9 @@ public class ProxyServiceStarter {
private ProxyConfiguration config;
+ @Getter
+ private Authentication proxyClientAuthentication;
+
@Getter
private ProxyService proxyService;
@@ -244,8 +253,27 @@ public class ProxyServiceStarter {
public void start() throws Exception {
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));
+
+ if (config.getBrokerClientAuthenticationPlugin() != null) {
+ proxyClientAuthentication =
AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
+ config.getBrokerClientAuthenticationParameters());
+ Objects.requireNonNull(proxyClientAuthentication, "No supported
auth found for proxy");
+ try {
+ proxyClientAuthentication.start();
+ } catch (Exception e) {
+ try {
+ proxyClientAuthentication.close();
+ } catch (IOException ioe) {
+ log.error("Failed to close the authentication service",
ioe);
+ }
+ throw new
PulsarClientException.InvalidConfigurationException(e.getMessage());
+ }
+ } else {
+ proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
+ }
+
// create proxy service
- proxyService = new ProxyService(config, authenticationService);
+ proxyService = new ProxyService(config, authenticationService,
proxyClientAuthentication);
// create a web-service
server = new WebServer(config, authenticationService);
@@ -293,7 +321,8 @@ public class ProxyServiceStarter {
}
AtomicReference<WebSocketService> webSocketServiceRef = new
AtomicReference<>();
- addWebServerHandlers(server, config, proxyService,
proxyService.getDiscoveryProvider(), webSocketServiceRef);
+ addWebServerHandlers(server, config, proxyService,
proxyService.getDiscoveryProvider(), webSocketServiceRef,
+ proxyClientAuthentication);
webSocketService = webSocketServiceRef.get();
// start web-service
@@ -311,6 +340,9 @@ public class ProxyServiceStarter {
if (webSocketService != null) {
webSocketService.close();
}
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
@@ -323,15 +355,17 @@ public class ProxyServiceStarter {
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
- BrokerDiscoveryProvider
discoveryProvider) throws Exception {
- addWebServerHandlers(server, config, service, discoveryProvider, null);
+ BrokerDiscoveryProvider
discoveryProvider,
+ Authentication
proxyClientAuthentication) throws Exception {
+ addWebServerHandlers(server, config, service, discoveryProvider, null,
proxyClientAuthentication);
}
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider
discoveryProvider,
- AtomicReference<WebSocketService>
webSocketServiceRef) throws Exception {
+ AtomicReference<WebSocketService>
webSocketServiceRef,
+ Authentication
proxyClientAuthentication) throws Exception {
// We can make 'status.html' publicly accessible without
authentication since
// it does not contain any sensitive data.
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH,
config.getStatusFilePath(),
@@ -348,7 +382,8 @@ public class ProxyServiceStarter {
}
}
- AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config,
discoveryProvider);
+ AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config,
discoveryProvider,
+ proxyClientAuthentication);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
server.addServlet("/admin", servletHolder);
server.addServlet("/lookup", servletHolder);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index f9ace716ecd..050199acc49 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -26,6 +26,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -121,6 +123,7 @@ public abstract class SimpleProxyExtensionTestBase extends
MockedPulsarServiceBa
private ProxyService proxyService;
private boolean useSeparateThreadPoolForProxyExtensions;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
public SimpleProxyExtensionTestBase(boolean
useSeparateThreadPoolForProxyExtensions) {
this.useSeparateThreadPoolForProxyExtensions =
useSeparateThreadPoolForProxyExtensions;
@@ -142,8 +145,12 @@ public abstract class SimpleProxyExtensionTestBase extends
MockedPulsarServiceBa
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -174,6 +181,9 @@ public abstract class SimpleProxyExtensionTestBase extends
MockedPulsarServiceBa
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index 92c644b470d..5995d11b33b 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -24,6 +24,8 @@ import
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -47,6 +49,8 @@ public class AdminProxyHandlerKeystoreTLSTest extends
MockedPulsarServiceBaseTes
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
+
private WebServer webServer;
private BrokerDiscoveryProvider discoveryProvider;
@@ -103,6 +107,10 @@ public class AdminProxyHandlerKeystoreTLSTest extends
MockedPulsarServiceBaseTes
KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
resource = new PulsarResources(registerCloseable(new
ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
@@ -110,7 +118,7 @@ public class AdminProxyHandlerKeystoreTLSTest extends
MockedPulsarServiceBaseTes
discoveryProvider = spy(registerCloseable(new
BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(),
brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
- ServletHolder servletHolder = new ServletHolder(new
AdminProxyHandler(proxyConfig, discoveryProvider));
+ ServletHolder servletHolder = new ServletHolder(new
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
webServer.start();
@@ -120,6 +128,9 @@ public class AdminProxyHandlerKeystoreTLSTest extends
MockedPulsarServiceBaseTes
@Override
protected void cleanup() throws Exception {
webServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
super.internalCleanup();
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
index becebe0059e..4f925618e8a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
@@ -32,6 +32,7 @@ import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.client.api.Authentication;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.testng.Assert;
@@ -46,7 +47,7 @@ public class AdminProxyHandlerTest {
// given
HttpClient httpClient = mock(HttpClient.class);
adminProxyHandler = new
AdminProxyHandler(mock(ProxyConfiguration.class),
- mock(BrokerDiscoveryProvider.class)) {
+ mock(BrokerDiscoveryProvider.class),
mock(Authentication.class)) {
@Override
protected HttpClient createHttpClient() throws ServletException {
return httpClient;
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index ef58648e35a..97bb91d924c 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -32,6 +32,8 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -51,6 +53,7 @@ public class AuthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(AuthedAdminProxyHandlerTest.class);
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
private WebServer webServer;
private BrokerDiscoveryProvider discoveryProvider;
private PulsarResources resource;
@@ -99,6 +102,10 @@ public class AuthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
resource = new PulsarResources(registerCloseable(new
ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
@@ -107,7 +114,7 @@ public class AuthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
LoadManagerReport report = new LoadReport(brokerUrl.toString(),
brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
- ServletHolder servletHolder = new ServletHolder(new
AdminProxyHandler(proxyConfig, discoveryProvider));
+ ServletHolder servletHolder = new ServletHolder(new
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
@@ -119,6 +126,9 @@ public class AuthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
@Override
protected void cleanup() throws Exception {
webServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
super.internalCleanup();
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
index db5e9e12bd2..a07a0f082d3 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.proxy.server;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,8 +40,13 @@ public class FunctionWorkerRoutingTest {
proxyConfig.setBrokerWebServiceURL(brokerUrl);
proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl);
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
BrokerDiscoveryProvider discoveryProvider =
mock(BrokerDiscoveryProvider.class);
- AdminProxyHandler handler = new AdminProxyHandler(proxyConfig,
discoveryProvider);
+ AdminProxyHandler handler = new AdminProxyHandler(proxyConfig,
discoveryProvider, proxyClientAuthentication);
String funcUrl =
handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test"));
Assert.assertEquals(funcUrl,
String.format("%s/admin/v3/functions/%s/%s",
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
index c29bfaa9648..b7ef0855e38 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -33,7 +34,7 @@ public class InvalidProxyConfigForAuthorizationTest {
proxyConfiguration.setAuthorizationEnabled(true);
proxyConfiguration.setAuthenticationEnabled(false);
try (ProxyService proxyService = new ProxyService(proxyConfiguration,
- Mockito.mock(AuthenticationService.class))) {
+ Mockito.mock(AuthenticationService.class),
Mockito.mock(Authentication.class))) {
proxyService.start();
fail("An exception should have been thrown");
} catch (Exception e) {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index f61a73bbf91..e12224da371 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -25,6 +25,8 @@ import okhttp3.Response;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
@@ -65,6 +67,7 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private WebServer proxyWebServer;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -83,8 +86,13 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
// this is for nar package test
// addServletNar();
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
- new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+ proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -99,7 +107,7 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
mockAdditionalServlet();
proxyWebServer = new WebServer(proxyConfig, authService);
- ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig,
proxyService, null);
+ ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig,
proxyService, null, proxyClientAuthentication);
proxyWebServer.start();
}
@@ -180,6 +188,9 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
proxyService.close();
proxyWebServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 4083c984d98..2a9a9f15b45 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -35,6 +35,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -74,6 +75,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends
ProducerConsumerBase
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
private final String configClusterName = "test";
@BeforeMethod
@@ -139,8 +141,12 @@ public class ProxyAuthenticatedProducerConsumerTest
extends ProducerConsumerBase
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -152,6 +158,9 @@ public class ProxyAuthenticatedProducerConsumerTest extends
ProducerConsumerBase
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
/**
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 662b8305c0e..7d3cf57d594 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -43,6 +43,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -235,7 +236,11 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
@Cleanup
- ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService);
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+
proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ @Cleanup
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
proxyService.start();
final String proxyServiceUrl = proxyService.getServiceUrl();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 78ab9bd0d95..671e68e5c3f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -27,6 +27,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.limiter.ConnectionController;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
@@ -46,6 +48,7 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
private final int NUM_CONCURRENT_INBOUND_CONNECTION = 4;
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -60,8 +63,11 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -74,6 +80,9 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
protected void cleanup() throws Exception {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
index 5ddb084e3c7..6a3992c550f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.proxy.server;
public class ProxyDisableZeroCopyTest extends ProxyTest {
@Override
- protected void initializeProxyConfig() {
+ protected void initializeProxyConfig() throws Exception {
super.initializeProxyConfig();
proxyConfig.setProxyZeroCopyModeEnabled(false);
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 413774daf2c..40aa8f50405 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -22,6 +22,8 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -48,6 +50,7 @@ public class ProxyEnableHAProxyProtocolTest extends
MockedPulsarServiceBaseTest
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -62,8 +65,12 @@ public class ProxyEnableHAProxyProtocolTest extends
MockedPulsarServiceBaseTest
proxyConfig.setHaProxyProtocolEnabled(true);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -77,6 +84,9 @@ public class ProxyEnableHAProxyProtocolTest extends
MockedPulsarServiceBaseTest
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 5e969ca26e4..9c3a69b5f44 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -30,6 +30,8 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -118,7 +120,11 @@ public class ProxyForwardAuthDataTest extends
ProducerConsumerBase {
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
- try (ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService)) {
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ try (ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication)) {
proxyService.start();
try (PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) {
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
@@ -134,7 +140,7 @@ public class ProxyForwardAuthDataTest extends
ProducerConsumerBase {
PulsarConfigurationLoader.convertFrom(proxyConfig));
@Cleanup
- ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService);
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
proxyService.start();
@Cleanup
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
index 90e15ede2f4..cf587015544 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java
@@ -33,9 +33,12 @@ import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;
+import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.eclipse.jetty.client.HttpClient;
@@ -197,10 +200,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
try {
Response r =
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -226,10 +233,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
try {
Response r1 =
client.target(webServer.getServiceUri()).path("/server1/foobar").request().get();
@@ -257,10 +268,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
}
@@ -276,10 +291,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
try {
Response r =
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -303,10 +322,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
try {
Response r =
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -329,10 +352,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
try {
Response r =
client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get();
@@ -354,6 +381,10 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
StringBuilder longUri = new StringBuilder("/service3/tp");
for (int i = 10 * 1024; i > 0; i = i - 11){
@@ -362,7 +393,7 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
WebServer webServerMaxUriLen8k = new WebServer(proxyConfig,
authService);
ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k,
proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServerMaxUriLen8k.start();
try {
Response r =
client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get();
@@ -374,7 +405,7 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024);
WebServer webServerMaxUriLen12k = new WebServer(proxyConfig,
authService);
ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k,
proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServerMaxUriLen12k.start();
try {
Response r =
client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get();
@@ -395,10 +426,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
try {
Response r =
client.target(webServer.getServiceUri()).path("/ui/foobar").request().get();
@@ -427,10 +462,14 @@ public class ProxyIsAHttpProxyTest extends
MockedPulsarServiceBaseTest {
ProxyConfiguration proxyConfig =
PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
HttpClient httpClient = new HttpClient();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5671c527f68..8aa5581a0fe 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -24,6 +24,8 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -40,6 +42,7 @@ import org.testng.annotations.Test;
public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest
{
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeMethod
@@ -87,9 +90,13 @@ public class ProxyKeyStoreTlsTransportTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
proxyConfig.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -103,6 +110,9 @@ public class ProxyKeyStoreTlsTransportTest extends
MockedPulsarServiceBaseTest {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
protected PulsarClient newClient() throws Exception {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
index 99fb8c03a81..2c6d080bf2c 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
@@ -33,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -54,6 +56,7 @@ import org.testng.annotations.Test;
public class ProxyKeyStoreTlsWithAuthTest extends MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeMethod
@@ -88,9 +91,13 @@ public class ProxyKeyStoreTlsWithAuthTest extends
MockedPulsarServiceBaseTest {
providers.add(AuthenticationProviderTls.class.getName());
proxyConfig.setAuthenticationProviders(providers);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -104,6 +111,9 @@ public class ProxyKeyStoreTlsWithAuthTest extends
MockedPulsarServiceBaseTest {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
protected PulsarClient internalSetUpForClient(boolean addCertificates,
String lookupUrl) throws Exception {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
index 1dcebda7935..3a20273b8c0 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
@@ -29,6 +29,8 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -50,6 +52,7 @@ import org.testng.annotations.Test;
public class ProxyKeyStoreTlsWithoutAuthTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeMethod
@@ -76,8 +79,12 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends
MockedPulsarServiceBaseTest
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -109,6 +116,9 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends
MockedPulsarServiceBaseTest
protected void cleanup() throws Exception {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index a9017404d0e..4d12fdd77e7 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -31,6 +31,8 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
@@ -53,6 +55,7 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5;
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeMethod(alwaysRun = true)
@@ -69,7 +72,10 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
- proxyService = Mockito.spy(new ProxyService(proxyConfig,
authenticationService));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ proxyService = Mockito.spy(new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -84,6 +90,9 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
if (proxyService != null) {
proxyService.close();
}
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test(groups = "quarantine")
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index fae44c00ada..ab428c31b7f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -48,6 +50,7 @@ public class ProxyMutualTlsTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -68,8 +71,12 @@ public class ProxyMutualTlsTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setTlsAllowInsecureConnection(false);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -83,6 +90,9 @@ public class ProxyMutualTlsTest extends
MockedPulsarServiceBaseTest {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 1a9459619eb..583ab7000e5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -62,6 +64,7 @@ public class ProxyParserTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -75,9 +78,12 @@ public class ProxyParserTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setClusterName(configClusterName);
//enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -93,6 +99,9 @@ public class ProxyParserTest extends
MockedPulsarServiceBaseTest {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
index b692987d17a..4dd7bc981e5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -42,6 +42,8 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.awaitility.Awaitility;
@@ -59,6 +61,7 @@ public class ProxyPrometheusMetricsTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private WebServer proxyWebServer;
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -72,8 +75,13 @@ public class ProxyPrometheusMetricsTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(TEST_CLUSTER);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
- new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+ proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -86,7 +94,7 @@ public class ProxyPrometheusMetricsTest extends
MockedPulsarServiceBaseTest {
PulsarConfigurationLoader.convertFrom(proxyConfig));
proxyWebServer = new WebServer(proxyConfig, authService);
- ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig,
proxyService, null);
+ ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig,
proxyService, null, proxyClientAuthentication);
proxyWebServer.start();
}
@@ -109,6 +117,9 @@ public class ProxyPrometheusMetricsTest extends
MockedPulsarServiceBaseTest {
if (proxyService != null) {
proxyService.close();
}
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
/**
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index d06cf4201ff..bdabfecaa43 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -35,6 +35,8 @@ import
org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientCnx;
@@ -57,6 +59,7 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
private ProxyService proxyService;
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
protected void doInitConf() throws Exception {
@@ -127,9 +130,13 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
properties.setProperty("tokenSecretKey",
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
proxyConfig.setProperties(properties);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
}
@AfterClass(alwaysRun = true)
@@ -137,6 +144,9 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
private void startProxy(boolean forwardAuthData) throws Exception {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index a1ffc13ee93..883b725e15d 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import javax.naming.AuthenticationException;
+import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -35,6 +36,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -219,9 +221,14 @@ public class ProxyRolesEnforcementTest extends
ProducerConsumerBase {
providers.add(BasicAuthenticationProvider.class.getName());
proxyConfig.setAuthenticationProviders(providers);
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
try (ProxyService proxyService = new ProxyService(proxyConfig,
new AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig)))) {
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication)) {
proxyService.start();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 0b9b6f17d12..d96d2cd1f6e 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -20,16 +20,22 @@ package org.apache.pulsar.proxy.server;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Base64;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
+import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.client.HttpClient;
@@ -160,4 +166,89 @@ public class ProxyServiceStarterTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testProxyClientAuthentication() throws Exception {
+ final Consumer<ProxyConfiguration> initConfig = (proxyConfig) -> {
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setWebSocketServiceEnabled(true);
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setClusterName(configClusterName);
+ };
+
+
+
+ ProxyServiceStarter serviceStarter = new ProxyServiceStarter(ARGS,
null, true);
+ initConfig.accept(serviceStarter.getConfig());
+ // ProxyServiceStarter will throw an exception when
Authentication#start is failed
+
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication1.class.getName());
+ try {
+ serviceStarter.start();
+ fail("ProxyServiceStarter should throw an exception when
Authentication#start is failed");
+ } catch (Exception ex) {
+
assertTrue(ex.getMessage().contains("ExceptionAuthentication1#start"));
+ assertTrue(serviceStarter.getProxyClientAuthentication()
instanceof ExceptionAuthentication1);
+ }
+
+ serviceStarter = new ProxyServiceStarter(ARGS, null, true);
+ initConfig.accept(serviceStarter.getConfig());
+ // ProxyServiceStarter will throw an exception when
Authentication#start and Authentication#close are failed
+
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication2.class.getName());
+ try {
+ serviceStarter.start();
+ fail("ProxyServiceStarter should throw an exception when
Authentication#start and Authentication#close are failed");
+ } catch (Exception ex) {
+
assertTrue(ex.getMessage().contains("ExceptionAuthentication2#start"));
+ assertTrue(serviceStarter.getProxyClientAuthentication()
instanceof ExceptionAuthentication2);
+ }
+ }
+
+ public static class ExceptionAuthentication1 implements Authentication {
+
+ @Override
+ public String getAuthMethodName() {
+ return
"org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication1";
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ // no-op
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ throw new PulsarClientException("ExceptionAuthentication1#start");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+ }
+
+ public static class ExceptionAuthentication2 implements Authentication {
+
+ @Override
+ public String getAuthMethodName() {
+ return
"org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication2";
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ // no-op
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ throw new PulsarClientException("ExceptionAuthentication2#start");
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new IOException("ExceptionAuthentication2#close");
+ }
+ }
+
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 2866c6c2690..86d572702f3 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -38,6 +38,8 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -61,6 +63,7 @@ public class ProxyStatsTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private WebServer proxyWebServer;
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -76,8 +79,12 @@ public class ProxyStatsTest extends
MockedPulsarServiceBaseTest {
// enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.of(2));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
- new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -90,7 +97,7 @@ public class ProxyStatsTest extends
MockedPulsarServiceBaseTest {
PulsarConfigurationLoader.convertFrom(proxyConfig));
proxyWebServer = new WebServer(proxyConfig, authService);
- ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig,
proxyService, null);
+ ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig,
proxyService, null, proxyClientAuthentication);
proxyWebServer.start();
}
@@ -109,6 +116,9 @@ public class ProxyStatsTest extends
MockedPulsarServiceBaseTest {
internalCleanup();
proxyService.close();
proxyWebServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
/**
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 6e66008c15a..30c6e45654b 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -28,6 +28,8 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
@@ -56,6 +58,7 @@ public class ProxyStuckConnectionTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig;
+ private Authentication proxyClientAuthentication;
private SocatContainer socatContainer;
private String brokerServiceUriSocat;
@@ -81,6 +84,10 @@ public class ProxyStuckConnectionTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
startProxyService();
// use the same port for subsequent restarts
proxyConfig.setServicePort(proxyService.getListenPort());
@@ -88,7 +95,7 @@ public class ProxyStuckConnectionTest extends
MockedPulsarServiceBaseTest {
private void startProxyService() throws Exception {
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))) {
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication) {
@Override
protected LookupProxyHandler newLookupProxyHandler(ProxyConnection
proxyConnection) {
return new TestLookupProxyHandler(this, proxyConnection);
@@ -107,6 +114,9 @@ public class ProxyStuckConnectionTest extends
MockedPulsarServiceBaseTest {
if (proxyService != null) {
proxyService.close();
}
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
if (socatContainer != null) {
socatContainer.close();
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index e1e49f9e8c5..e101eb4ff7a 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -38,6 +38,8 @@ import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -74,6 +76,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
protected ProxyService proxyService;
protected ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ protected Authentication proxyClientAuthentication;
@Data
@ToString
@@ -94,7 +97,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
initializeProxyConfig();
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -102,12 +105,16 @@ public class ProxyTest extends
MockedPulsarServiceBaseTest {
proxyService.start();
}
- protected void initializeProxyConfig() {
+ protected void initializeProxyConfig() throws Exception {
proxyConfig.setServicePort(Optional.ofNullable(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);
+
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
}
@Override
@@ -116,6 +123,9 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 4e300d39741..0f0dc30b620 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -45,6 +47,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest
{
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@Override
@BeforeClass
@@ -63,8 +66,12 @@ public class ProxyTlsTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -78,6 +85,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest
{
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
index 16f610d6d0a..42b5ae178d3 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
@@ -27,6 +27,8 @@ import java.util.Optional;
import org.apache.pulsar.broker.auth.MockOIDCIdentityProvider;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.mockito.Mockito;
@@ -38,6 +40,7 @@ public class ProxyTlsWithAuthTest extends
MockedPulsarServiceBaseTest {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
private MockOIDCIdentityProvider server;
@@ -75,8 +78,12 @@ public class ProxyTlsWithAuthTest extends
MockedPulsarServiceBaseTest {
" \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}");
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -89,6 +96,9 @@ public class ProxyTlsWithAuthTest extends
MockedPulsarServiceBaseTest {
protected void cleanup() throws Exception {
internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
server.stop();
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index cf9ad5831ec..92a54aa12fd 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -34,6 +34,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -73,6 +74,7 @@ public class ProxyWithAuthorizationNegTest extends
ProducerConsumerBase {
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@BeforeMethod
@Override
@@ -138,7 +140,10 @@ public class ProxyWithAuthorizationNegTest extends
ProducerConsumerBase {
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
- proxyService = Mockito.spy(new ProxyService(proxyConfig,
authenticationService));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ proxyService = Mockito.spy(new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication));
proxyService.start();
}
@@ -148,6 +153,9 @@ public class ProxyWithAuthorizationNegTest extends
ProducerConsumerBase {
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
/**
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index bc96c7ea510..51f42ea0771 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -38,6 +38,7 @@ import
org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -87,6 +88,7 @@ public class ProxyWithAuthorizationTest extends
ProducerConsumerBase {
private ProxyService proxyService;
private WebServer webServer;
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@DataProvider(name = "hostnameVerification")
public Object[][] hostnameVerificationCodecProvider() {
@@ -230,7 +232,10 @@ public class ProxyWithAuthorizationTest extends
ProducerConsumerBase {
AuthenticationService authService =
new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig));
- proxyService = Mockito.spy(new ProxyService(proxyConfig, authService));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ proxyService = Mockito.spy(new ProxyService(proxyConfig, authService,
proxyClientAuthentication));
proxyService.setGracefulShutdown(false);
webServer = new WebServer(proxyConfig, authService);
}
@@ -241,11 +246,14 @@ public class ProxyWithAuthorizationTest extends
ProducerConsumerBase {
super.internalCleanup();
proxyService.close();
webServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
private void startProxy() throws Exception {
proxyService.start();
- ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
proxyService, null);
+ ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
proxyService, null, proxyClientAuthentication);
webServer.start();
}
@@ -459,10 +467,15 @@ public class ProxyWithAuthorizationTest extends
ProducerConsumerBase {
proxyConfig.setTlsProtocols(tlsProtocols);
proxyConfig.setTlsCiphers(tlsCiphers);
+ @Cleanup
+ final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
@Cleanup
ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig,
new
AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
proxyService.setGracefulShutdown(false);
try {
proxyService.start();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index d3c05fec721..3567c8264f1 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -49,6 +49,8 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -75,6 +77,7 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
private static final int TEST_TIMEOUT_MS = 30_000;
+ private Authentication proxyClientAuthentication;
private ProxyService proxyService;
@Override
@@ -150,8 +153,11 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
@BeforeMethod(alwaysRun = true)
public void proxySetup() throws Exception {
var proxyConfig = initializeProxyConfig();
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
- PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
@@ -163,6 +169,9 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
if (proxyService != null) {
proxyService.close();
}
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test(timeOut = TEST_TIMEOUT_MS)
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 5fb3e046824..63929ee72e4 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -39,6 +39,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
@@ -83,6 +84,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
private ProxyService proxyService;
private WebServer webServer;
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@BeforeMethod
@Override
@@ -130,7 +132,10 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
AuthenticationService authService =
new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig));
- proxyService = Mockito.spy(new ProxyService(proxyConfig, authService));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ proxyService = Mockito.spy(new ProxyService(proxyConfig, authService,
proxyClientAuthentication));
webServer = new WebServer(proxyConfig, authService);
}
@@ -140,11 +145,14 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
super.internalCleanup();
proxyService.close();
webServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
private void startProxy() throws Exception {
proxyService.start();
- ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
proxyService, null);
+ ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
proxyService, null, proxyClientAuthentication);
webServer.start();
}
@@ -425,7 +433,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
PulsarConfigurationLoader.convertFrom(proxyConfig));
final WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
proxyService,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
@Cleanup
final Client client = javax.ws.rs.client.ClientBuilder
@@ -450,7 +458,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
proxyConfig.setAuthenticateMetricsEndpoint(false);
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
proxyService,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
@Cleanup
Client client = javax.ws.rs.client.ClientBuilder.newClient(new
ClientConfig().register(LoggingFeature.class));
@@ -463,7 +471,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
proxyConfig.setAuthenticateMetricsEndpoint(true);
webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig,
proxyService,
- registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)));
+ registerCloseable(new BrokerDiscoveryProvider(proxyConfig,
resource)), proxyClientAuthentication);
webServer.start();
try {
Response r =
client.target(webServer.getServiceUri()).path("/metrics").request().get();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 9d9490e74b5..885064b8e74 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -33,6 +33,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -57,6 +58,7 @@ public class ProxyWithoutServiceDiscoveryTest extends
ProducerConsumerBase {
private static final String CLUSTER_NAME = "without-service-discovery";
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
@BeforeMethod
@@ -122,9 +124,13 @@ public class ProxyWithoutServiceDiscoveryTest extends
ProducerConsumerBase {
proxyConfig.setAuthenticationProviders(providers);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
new AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+
PulsarConfigurationLoader.convertFrom(proxyConfig)),
proxyClientAuthentication));
proxyService.start();
}
@@ -134,6 +140,9 @@ public class ProxyWithoutServiceDiscoveryTest extends
ProducerConsumerBase {
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
/**
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index 57522186c8f..71025ed484f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -32,6 +32,8 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -47,6 +49,7 @@ import org.testng.annotations.Test;
public class SuperUserAuthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
private WebServer webServer;
private BrokerDiscoveryProvider discoveryProvider;
private PulsarResources resource;
@@ -94,6 +97,10 @@ public class SuperUserAuthedAdminProxyHandlerTest extends
MockedPulsarServiceBas
proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
resource = new PulsarResources(registerCloseable(new
ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
@@ -102,7 +109,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends
MockedPulsarServiceBas
LoadManagerReport report = new LoadReport(brokerUrl.toString(),
brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
- ServletHolder servletHolder = new ServletHolder(new
AdminProxyHandler(proxyConfig, discoveryProvider));
+ ServletHolder servletHolder = new ServletHolder(new
AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
@@ -114,6 +121,9 @@ public class SuperUserAuthedAdminProxyHandlerTest extends
MockedPulsarServiceBas
@Override
protected void cleanup() throws Exception {
webServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
super.internalCleanup();
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index fe8b1f45385..0b597b93354 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -35,6 +35,8 @@ import
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -49,6 +51,7 @@ import org.testng.annotations.Test;
public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest
{
private final String STATUS_FILE_PATH =
"./src/test/resources/vip_status.html";
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private Authentication proxyClientAuthentication;
private WebServer webServer;
private BrokerDiscoveryProvider discoveryProvider;
private AdminProxyWrapper adminProxyHandler;
@@ -77,13 +80,17 @@ public class UnauthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
resource = new PulsarResources(registerCloseable(new
ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
discoveryProvider = spy(registerCloseable(new
BrokerDiscoveryProvider(proxyConfig, resource)));
- adminProxyHandler = new AdminProxyWrapper(proxyConfig,
discoveryProvider);
+ adminProxyHandler = new AdminProxyWrapper(proxyConfig,
discoveryProvider, proxyClientAuthentication);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
@@ -101,6 +108,9 @@ public class UnauthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
protected void cleanup() throws Exception {
internalCleanup();
webServer.stop();
+ if (proxyClientAuthentication != null) {
+ proxyClientAuthentication.close();
+ }
}
@Test
@@ -128,8 +138,8 @@ public class UnauthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
static class AdminProxyWrapper extends AdminProxyHandler {
String rewrittenUrl;
- AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider
discoveryProvider) {
- super(config, discoveryProvider);
+ AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider
discoveryProvider, Authentication proxyClientAuthentication) {
+ super(config, discoveryProvider, proxyClientAuthentication);
}
@Override