This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 18b2a20 Start Pulsar in TLS Only mode and deprecate tlsEnabled flag. (#2988) 18b2a20 is described below commit 18b2a20d970e1241e0e78149b30bda4780042cf7 Author: Jai Asher <j...@ccs.neu.edu> AuthorDate: Sat Dec 15 17:37:02 2018 -0800 Start Pulsar in TLS Only mode and deprecate tlsEnabled flag. (#2988) ### Motivation Start Pulsar services (broker, proxy, websocket, discovery) in TLS only mode, so that they only listen on TLS ports. Once TlsPort is set tlsEnabled flag becomes redundant information - hence getting rid of the flag in relevant components. ### Modifications a. Changed the Ports to Option<Integer> in the configuration file. b. In Websocket Service there was a bug where we used 'tlsEnabled' flag to start listening on a TLS port and to talk to broker in on serviceUrlTls - separated the flag into two (tlsEnabled and brokerClientTlsEnabled) and deprecated tlsEnabled. c. Fixed a lot of tests which relied on tlsEnabled flag. ### Result Brokers can now listen to TLS only port. --- .gitignore | 1 + conf/broker.conf | 10 +++---- conf/discovery.conf | 6 ++-- conf/proxy.conf | 6 ++-- conf/websocket.conf | 5 ++-- .../apache/pulsar/broker/ServiceConfiguration.java | 25 ++++++++++++---- .../PulsarConfigurationLoaderTest.java | 10 +++---- .../org/apache/pulsar/PulsarBrokerStarter.java | 10 +++---- .../java/org/apache/pulsar/PulsarStandalone.java | 10 +++---- .../broker/MessagingServiceShutdownHook.java | 2 +- .../org/apache/pulsar/broker/PulsarService.java | 26 +++++++++++------ .../apache/pulsar/broker/admin/AdminResource.java | 4 +-- .../pulsar/broker/loadbalance/NoopLoadManager.java | 2 +- .../loadbalance/impl/ModularLoadManagerImpl.java | 2 +- .../loadbalance/impl/SimpleLoadManagerImpl.java | 4 +-- .../pulsar/broker/namespace/NamespaceService.java | 21 +++++++++---- .../pulsar/broker/service/BrokerService.java | 30 +++++++++---------- .../pulsar/broker/web/PulsarWebResource.java | 2 +- .../org/apache/pulsar/broker/web/WebService.java | 17 +++++++---- .../apache/pulsar/compaction/CompactorTool.java | 15 ++++++---- .../apache/pulsar/broker/SLAMonitoringTest.java | 4 +-- .../apache/pulsar/broker/admin/AdminApiTest.java | 1 - .../apache/pulsar/broker/admin/AdminApiTest2.java | 2 +- .../pulsar/broker/admin/AdminApiTlsAuthTest.java | 1 - .../broker/admin/BrokerAdminClientTlsAuthTest.java | 1 - .../pulsar/broker/admin/v1/V1_AdminApiTest.java | 1 - .../AntiAffinityNamespaceGroupTest.java | 2 +- .../broker/loadbalance/LoadBalancerTest.java | 2 +- .../loadbalance/SimpleLoadManagerImplTest.java | 2 +- .../broker/namespace/OwnershipCacheTest.java | 3 +- .../pulsar/broker/service/BrokerServiceTest.java | 5 ---- .../pulsar/broker/service/PersistentTopicTest.java | 4 +-- .../pulsar/broker/service/ReplicatorTestBase.java | 2 -- .../broker/service/v1/V1_ReplicatorTestBase.java | 2 -- .../apache/pulsar/broker/web/WebServiceTest.java | 5 ++-- .../api/AuthenticatedProducerConsumerTest.java | 1 - .../AuthenticationTlsHostnameVerificationTest.java | 1 - .../pulsar/client/api/BrokerServiceLookupTest.java | 18 +++++------- .../pulsar/client/api/ServiceUrlProviderTest.java | 2 ++ .../pulsar/client/api/TlsProducerConsumerBase.java | 1 - .../common/naming/ServiceConfigurationTest.java | 3 +- .../worker/PulsarWorkerAssignmentTest.java | 4 +-- .../apache/pulsar/io/PulsarFunctionAdminTest.java | 5 ++-- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 5 ++-- .../apache/pulsar/io/PulsarFunctionTlsTest.java | 5 ++-- .../websocket/proxy/ProxyPublishConsumeTest.java | 2 +- .../proxy/ProxyPublishConsumeTlsTest.java | 2 +- .../pulsar/discovery/service/DiscoveryService.java | 34 +++++++++++++++------- .../discovery/service/server/ServerManager.java | 18 +++++------- .../discovery/service/server/ServiceConfig.java | 31 +++++++++++--------- .../discovery/service/BaseDiscoveryTestSetup.java | 1 - .../service/web/DiscoveryServiceWebTest.java | 1 - .../pulsar/functions/worker/WorkerConfig.java | 12 ++++++-- .../pulsar/functions/worker/rest/WorkerServer.java | 2 +- .../worker/WorkerApiV2ResourceConfigTest.java | 4 +-- .../pulsar/proxy/server/ProxyConfiguration.java | 33 +++++++++++++++------ .../apache/pulsar/proxy/server/ProxyService.java | 30 +++++++++++++------ .../org/apache/pulsar/proxy/server/WebServer.java | 15 +++++----- .../proxy/server/AuthedAdminProxyHandlerTest.java | 4 +-- .../ProxyAuthenticatedProducerConsumerTest.java | 4 +-- .../proxy/server/ProxyAuthenticationTest.java | 1 - .../server/ProxyConnectionThrottlingTest.java | 4 +-- .../proxy/server/ProxyForwardAuthDataTest.java | 1 - .../proxy/server/ProxyLookupThrottlingTest.java | 2 +- .../proxy/server/ProxyRolesEnforcementTest.java | 1 - .../org/apache/pulsar/proxy/server/ProxyTest.java | 10 +++---- .../apache/pulsar/proxy/server/ProxyTlsTest.java | 5 ++-- .../server/ProxyWithAuthorizationNegTest.java | 4 +-- .../proxy/server/ProxyWithAuthorizationTest.java | 11 +++---- .../server/ProxyWithoutServiceDiscoveryTest.java | 4 +-- .../SuperUserAuthedAdminProxyHandlerTest.java | 4 +-- .../server/UnauthedAdminProxyHandlerTest.java | 6 ++-- .../apache/pulsar/websocket/WebSocketService.java | 12 ++++---- .../pulsar/websocket/service/ProxyServer.java | 17 +++++------ .../service/WebSocketProxyConfiguration.java | 25 +++++++++------- .../pulsar/websocket/LookupProtocolTest.java | 6 ++-- 76 files changed, 325 insertions(+), 271 deletions(-) diff --git a/.gitignore b/.gitignore index e43e519..f605f1e 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar .project .settings/ .recommenders/ +.factorypath # Intellij .idea/ diff --git a/conf/broker.conf b/conf/broker.conf index 85bbb26..2ff87a3 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -28,14 +28,14 @@ configurationStoreServers= # Broker data port brokerServicePort=6650 -# Broker data port for TLS -brokerServicePortTls=6651 +# Broker data port for TLS - By default TLS is disabled +brokerServicePortTls= # Port to use to server HTTP request webServicePort=8080 -# Port to use to server HTTPS request -webServicePortTls=8443 +# Port to use to server HTTPS request - By default TLS is disabled +webServicePortTls= # Hostname or IP address the service binds on, default is 0.0.0.0. bindAddress=0.0.0.0 @@ -231,7 +231,7 @@ proxyRoles= # else it just accepts the originalPrincipal and authorizes it (if required). authenticateOriginalAuthData=false -# Enable TLS +# Deprecated - Use webServicePortTls and brokerServicePortTls instead tlsEnabled=false # Path for the TLS certificate file diff --git a/conf/discovery.conf b/conf/discovery.conf index b1b6f41..907f546 100644 --- a/conf/discovery.conf +++ b/conf/discovery.conf @@ -30,13 +30,13 @@ zookeeperSessionTimeoutMs=30000 servicePort=6650 # Port to use to server binary-proto-tls request -servicePortTls=6651 +servicePortTls= # Port that discovery service listen on webServicePort=8080 # Port to use to server HTTPS request -webServicePortTls=8443 +webServicePortTls= # Control whether to bind directly on localhost rather than on normal hostname bindOnLocalhost=false @@ -65,7 +65,7 @@ superUserRoles= authorizationAllowWildcardsMatching=false ##### --- TLS --- ##### -# Enable TLS +# Deprecated - Use servicePortTls and webServicePortTls instead tlsEnabled=false # Path for the TLS certificate file diff --git a/conf/proxy.conf b/conf/proxy.conf index 2d17aa5..3c3b027 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -47,13 +47,13 @@ zookeeperSessionTimeoutMs=30000 servicePort=6650 # The port to use to server binary Protobuf TLS requests -servicePortTls=6651 +servicePortTls= # Port that discovery service listen on webServicePort=8080 # Port to use to server HTTPS request -webServicePortTls=8443 +webServicePortTls= # Path for the file used to determine the rotation status for the proxy instance when responding # to service discovery health checks @@ -110,7 +110,7 @@ maxConcurrentLookupRequests=50000 ##### --- TLS --- ##### -# Whether TLS is enabled for the proxy +# Deprecated - use servicePortTls and webServicePortTls instead tlsEnabledInProxy=false # Path for the TLS certificate file diff --git a/conf/websocket.conf b/conf/websocket.conf index c8144fd..ab03e56 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -34,7 +34,7 @@ brokerServiceUrlTls= # Port to use to server HTTP request webServicePort=8080 # Port to use to server HTTPS request -webServicePortTls=8443 +webServicePortTls= # Path for the file used to determine the rotation status for the proxy-instance when responding # to service discovery health checks @@ -79,6 +79,7 @@ authorizationAllowWildcardsMatching=false superUserRoles= # Authentication settings of the proxy itself. Used to connect to brokers +brokerClientTlsEnabled=false; brokerClientAuthenticationPlugin= brokerClientAuthenticationParameters= brokerClientTrustCertsFilePath= @@ -88,7 +89,7 @@ anonymousUserRole= ### --- TLS --- ### -# Enable TLS +# Deprecated - use webServicePortTls and brokerClientTlsEnabled instead tlsEnabled=false # Accept untrusted TLS certificate from client diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 6e7b168..3939ceb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -52,12 +52,12 @@ public class ServiceConfiguration implements PulsarConfiguration { // Configuration Store connection string @FieldContext(required = false) private String configurationStoreServers; - private int brokerServicePort = 6650; - private int brokerServicePortTls = 6651; + private Integer brokerServicePort = 6650; + private Integer brokerServicePortTls = null; // Port to use to server HTTP request - private int webServicePort = 8080; + private Integer webServicePort = 8080; // Port to use to server HTTPS request - private int webServicePortTls = 8443; + private Integer webServicePortTls = null; // Hostname or IP address the service binds on. private String bindAddress = "0.0.0.0"; @@ -228,7 +228,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private int maxConsumersPerSubscription = 0; /***** --- TLS --- ****/ - // Enable TLS + @Deprecated private boolean tlsEnabled = false; // Path for the TLS certificate file private String tlsCertificateFilePath; @@ -589,5 +589,20 @@ public class ServiceConfiguration implements PulsarConfiguration { public int getBookkeeperHealthCheckIntervalSec() { return (int) bookkeeperClientHealthCheckIntervalSeconds; } + + public Optional<Integer> getBrokerServicePort() { + return Optional.ofNullable(brokerServicePort); + } + + public Optional<Integer> getBrokerServicePortTls() { + return Optional.ofNullable(brokerServicePortTls); + } + + public Optional<Integer> getWebServicePort() { + return Optional.ofNullable(webServicePort); + } + public Optional<Integer> getWebServicePortTls() { + return Optional.ofNullable(webServicePortTls); + } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java index baa0e75..3459086 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java @@ -67,10 +67,10 @@ public class PulsarConfigurationLoaderTest { // check whether converting correctly assertEquals(serviceConfiguration.getZookeeperServers(), "localhost:2181"); assertEquals(serviceConfiguration.getConfigurationStoreServers(), "localhost:2184"); - assertEquals(serviceConfiguration.getBrokerServicePort(), 7650); - assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651); - assertEquals(serviceConfiguration.getWebServicePort(), 9080); - assertEquals(serviceConfiguration.getWebServicePortTls(), 9443); + assertEquals(serviceConfiguration.getBrokerServicePort().get(), new Integer(7650)); + assertEquals(serviceConfiguration.getBrokerServicePortTls().get(), new Integer(7651)); + assertEquals(serviceConfiguration.getWebServicePort().get(), new Integer(9080)); + assertEquals(serviceConfiguration.getWebServicePortTls().get(), new Integer(9443)); // check whether exception causes try { @@ -112,7 +112,7 @@ public class PulsarConfigurationLoaderTest { assertEquals(serviceConfig.getBacklogQuotaDefaultLimitGB(), 18); assertEquals(serviceConfig.getClusterName(), "usc"); assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role"); - assertEquals(serviceConfig.getBrokerServicePort(), 7777); + assertEquals(serviceConfig.getBrokerServicePort().get(), new Integer(7777)); assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index 0af322f..1cf1b5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -147,17 +147,17 @@ public class PulsarBrokerStarter { boolean useTls = workerConfig.isUseTls(); String localhost = "127.0.0.1"; String pulsarServiceUrl = useTls - ? PulsarService.brokerUrlTls(localhost, brokerConfig.getBrokerServicePortTls()) - : PulsarService.brokerUrl(localhost, brokerConfig.getBrokerServicePort()); + ? PulsarService.brokerUrlTls(localhost, brokerConfig.getBrokerServicePortTls().get()) + : PulsarService.brokerUrl(localhost, brokerConfig.getBrokerServicePort().get()); String webServiceUrl = useTls - ? PulsarService.webAddressTls(localhost, brokerConfig.getWebServicePortTls()) - : PulsarService.webAddress(localhost, brokerConfig.getWebServicePort()); + ? PulsarService.webAddressTls(localhost, brokerConfig.getWebServicePortTls().get()) + : PulsarService.webAddress(localhost, brokerConfig.getWebServicePort().get()); workerConfig.setPulsarServiceUrl(pulsarServiceUrl); workerConfig.setPulsarWebServiceUrl(webServiceUrl); String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress( brokerConfig.getAdvertisedAddress()); workerConfig.setWorkerHostname(hostname); - workerConfig.setWorkerPort(brokerConfig.getWebServicePort()); + workerConfig.setWorkerPort(brokerConfig.getWebServicePort().get()); workerConfig.setWorkerId( "c-" + brokerConfig.getClusterName() + "-fw-" + hostname diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index d1f37c9..8711bd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -272,8 +272,8 @@ public class PulsarStandalone implements AutoCloseable { workerConfig = WorkerConfig.load(this.getFnWorkerConfigFile()); } // worker talks to local broker - workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort()); - workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort()); + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get()); + workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get()); if (!this.isNoStreamStorage()) { // only set the state storage service url when state is enabled. workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + this.getStreamStoragePort()); @@ -281,7 +281,7 @@ public class PulsarStandalone implements AutoCloseable { String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress( config.getAdvertisedAddress()); workerConfig.setWorkerHostname(hostname); - workerConfig.setWorkerPort(config.getWebServicePort()); + workerConfig.setWorkerPort(config.getWebServicePort().get()); workerConfig.setWorkerId( "c-" + config.getClusterName() + "-fw-" + hostname @@ -294,9 +294,9 @@ public class PulsarStandalone implements AutoCloseable { broker.start(); URL webServiceUrl = new URL( - String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort())); + String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort().get())); final String brokerServiceUrl = String.format("pulsar://%s:%d", config.getAdvertisedAddress(), - config.getBrokerServicePort()); + config.getBrokerServicePort().get()); admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication( config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java index 1f1007e..6cacaa5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java @@ -49,7 +49,7 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ public void run() { if (service.getConfiguration() != null) { LOG.info("messaging service shutdown hook started, lookup port=" - + service.getConfiguration().getWebServicePort() + ", broker url=" + service.getBrokerServiceUrl()); + + service.getConfiguration().getWebServicePort().get() + ", broker url=" + service.getBrokerServiceUrl()); } ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 4f5bd72..74e91be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -470,7 +470,7 @@ public class PulsarService implements AutoCloseable { this.startWorkerService(); LOG.info("messaging service is ready, bootstrap service on port={}, broker url={}, cluster={}, configs={}", - config.getWebServicePort(), brokerServiceUrl, config.getClusterName(), + config.getWebServicePort().get(), brokerServiceUrl, config.getClusterName(), ReflectionToStringBuilder.toString(config)); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -855,7 +855,11 @@ public class PulsarService implements AutoCloseable { } public static String brokerUrl(ServiceConfiguration config) { - return brokerUrl(advertisedAddress(config), config.getBrokerServicePort()); + if (config.getBrokerServicePort().isPresent()) { + return brokerUrl(advertisedAddress(config), config.getBrokerServicePort().get()); + } else { + return null; + } } public static String brokerUrl(String host, int port) { @@ -863,10 +867,10 @@ public class PulsarService implements AutoCloseable { } public static String brokerUrlTls(ServiceConfiguration config) { - if (config.isTlsEnabled()) { - return brokerUrlTls(advertisedAddress(config), config.getBrokerServicePortTls()); + if (config.getBrokerServicePortTls().isPresent()) { + return brokerUrlTls(advertisedAddress(config), config.getBrokerServicePortTls().get()); } else { - return ""; + return null; } } @@ -875,7 +879,11 @@ public class PulsarService implements AutoCloseable { } public static String webAddress(ServiceConfiguration config) { - return webAddress(advertisedAddress(config), config.getWebServicePort()); + if (config.getWebServicePort().isPresent()) { + return webAddress(advertisedAddress(config), config.getWebServicePort().get()); + } else { + return null; + } } public static String webAddress(String host, int port) { @@ -883,10 +891,10 @@ public class PulsarService implements AutoCloseable { } public static String webAddressTls(ServiceConfiguration config) { - if (config.isTlsEnabled()) { - return webAddressTls(advertisedAddress(config), config.getWebServicePortTls()); + if (config.getWebServicePortTls().isPresent()) { + return webAddressTls(advertisedAddress(config), config.getWebServicePortTls().get()); } else { - return ""; + return null; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index a76d25e..6dc2e40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -293,8 +293,8 @@ public abstract class AdminResource extends PulsarWebResource { protected void validateBrokerName(String broker) throws MalformedURLException { String brokerUrl = String.format("http://%s", broker); String brokerUrlTls = String.format("https://%s", broker); - if (!pulsar().getWebServiceAddress().equals(brokerUrl) - && !pulsar().getWebServiceAddressTls().equals(brokerUrlTls)) { + if (!brokerUrl.equals(pulsar().getWebServiceAddress()) + && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) { String[] parts = broker.split(":"); checkArgument(parts.length == 2, "Invalid broker url %s", broker); String host = parts[0]; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index 5773c61..087eb28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -54,7 +54,7 @@ public class NoopLoadManager implements LoadManager { @Override public void initialize(PulsarService pulsar) { - lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort(); + lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get(); localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress), new PulsarResourceDescription()); zkClient = pulsar.getZkClient(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 45a31f5..997d9bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -783,7 +783,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach // Register the brokers in zk list createZPathIfNotExists(zkClient, LoadManager.LOADBALANCE_BROKERS_ROOT); - String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort(); + String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get(); brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress; updateLocalBrokerData(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index da4c0e3..023ce3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -283,7 +283,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene // ignore the exception, node might be present already } } - String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort(); + String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort().get(); brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; LoadReport loadReport = null; try { @@ -1113,7 +1113,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(), - pulsar.getConfiguration().getWebServicePort())); + pulsar.getConfiguration().getWebServicePort().get())); loadReport.setBrokerVersionString(pulsar.getBrokerVersion()); SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ab8aef7..9b4fcae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -487,7 +487,7 @@ public class NamespaceService { if (LOG.isDebugEnabled()) { LOG.debug("Broker not found for SLA Monitoring Namespace {}", - candidateBroker + ":" + config.getWebServicePort()); + candidateBroker + ":" + config.getWebServicePort().get()); } return false; } @@ -972,11 +972,22 @@ public class NamespaceService { } public static String getHeartbeatNamespace(String host, ServiceConfiguration config) { - return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort()); + Integer port = null; + if (config.getWebServicePort().isPresent()) { + port = config.getWebServicePort().get(); + } else if (config.getWebServicePortTls().isPresent()) { + port = config.getWebServicePortTls().get(); + } + return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, port); } - - public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) { - return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort()); + public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) { + Integer port = null; + if (config.getWebServicePort().isPresent()) { + port = config.getWebServicePort().get(); + } else if (config.getWebServicePortTls().isPresent()) { + port = config.getWebServicePortTls().get(); + } + return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port); } public static String checkHeartbeatNamespace(ServiceUnitId ns) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 16352a4..a43e404 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -135,8 +135,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies private final PulsarService pulsar; private final ManagedLedgerFactory managedLedgerFactory; - private final int port; - private final int tlsPort; private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics; @@ -192,8 +190,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies public BrokerService(PulsarService pulsar) throws Exception { this.pulsar = pulsar; this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); - this.port = new URI(pulsar.getBrokerServiceUrl()).getPort(); - this.tlsPort = new URI(pulsar.getBrokerServiceUrlTls()).getPort(); this.topics = new ConcurrentOpenHashMap<>(); this.replicationClients = new ConcurrentOpenHashMap<>(); this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds(); @@ -293,20 +289,24 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false)); - // Bind and start to accept incoming connections. - InetSocketAddress addr = new InetSocketAddress(pulsar.getBindAddress(), port); - try { - bootstrap.bind(addr).sync(); - } catch (Exception e) { - throw new IOException("Failed to bind Pulsar broker on " + addr, e); + Optional<Integer> port = serviceConfig.getBrokerServicePort(); + if (port.isPresent()) { + // Bind and start to accept incoming connections. + InetSocketAddress addr = new InetSocketAddress(pulsar.getBindAddress(), port.get()); + try { + bootstrap.bind(addr).sync(); + } catch (Exception e) { + throw new IOException("Failed to bind Pulsar broker on " + addr, e); + } + log.info("Started Pulsar Broker service on port {}", port.get()); } - log.info("Started Pulsar Broker service on port {}", port); - - if (serviceConfig.isTlsEnabled()) { + + Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls(); + if (tlsPort.isPresent()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true)); - tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort)).sync(); - log.info("Started Pulsar Broker TLS service on port {} - TLS provider: {}", tlsPort, + tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort.get())).sync(); + log.info("Started Pulsar Broker TLS service on port {} - TLS provider: {}", tlsPort.get(), SslContext.defaultServerProvider()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index a955711..81e5a40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -320,7 +320,7 @@ public abstract class PulsarWebResource { private URI getRedirectionUrl(ClusterData differentClusterData) throws MalformedURLException { URL webUrl = null; - if (isRequestHttps() && pulsar.getConfiguration().isTlsEnabled() + if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent() && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) { webUrl = new URL(differentClusterData.getServiceUrlTls()); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 02243b9..4ca8fcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; import javax.servlet.DispatcherType; @@ -81,12 +82,16 @@ public class WebService implements AutoCloseable { this.server = new Server(webServiceExecutor); List<ServerConnector> connectors = new ArrayList<>(); - ServerConnector connector = new PulsarServerConnector(server, 1, 1); - connector.setPort(pulsar.getConfiguration().getWebServicePort()); - connector.setHost(pulsar.getBindAddress()); - connectors.add(connector); + Optional<Integer> port = pulsar.getConfiguration().getWebServicePort(); + if (port.isPresent()) { + ServerConnector connector = new PulsarServerConnector(server, 1, 1); + connector.setPort(port.get()); + connector.setHost(pulsar.getBindAddress()); + connectors.add(connector); + } - if (pulsar.getConfiguration().isTlsEnabled()) { + Optional<Integer> tlsPort = pulsar.getConfiguration().getWebServicePortTls(); + if (tlsPort.isPresent()) { try { SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory( pulsar.getConfiguration().isTlsAllowInsecureConnection(), @@ -95,7 +100,7 @@ public class WebService implements AutoCloseable { pulsar.getConfiguration().getTlsKeyFilePath(), pulsar.getConfiguration().isTlsRequireTrustedClientCertOnConnect()); ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory); - tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls()); + tlsConnector.setPort(tlsPort.get()); tlsConnector.setHost(pulsar.getBindAddress()); connectors.add(tlsConnector); } catch (GeneralSecurityException e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index c16d676..0816625 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -79,18 +79,23 @@ public class CompactorTool { arguments.brokerConfigFile, ServiceConfiguration.class); } - String pulsarServiceUrl = PulsarService.brokerUrl(brokerConfig); ClientBuilder clientBuilder = PulsarClient.builder(); if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) { clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(), brokerConfig.getBrokerClientAuthenticationParameters()); } - clientBuilder.serviceUrl(pulsarServiceUrl) - .enableTls(brokerConfig.isTlsEnabled()) - .allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection()) - .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath()); + + if (brokerConfig.getBrokerServicePortTls().isPresent()) { + clientBuilder.serviceUrl(PulsarService.brokerUrlTls(brokerConfig)) + .allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection()) + .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath()); + + } else { + clientBuilder.serviceUrl(PulsarService.brokerUrl(brokerConfig)); + } + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 233d6b7..cddf3ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -217,8 +217,8 @@ public class SLAMonitoringTest { try { pulsarServices[crashIndex] = new PulsarService(configurations[crashIndex]); pulsarServices[crashIndex].start(); - assertEquals(pulsarServices[crashIndex].getConfiguration().getBrokerServicePort(), - brokerNativeBrokerPorts[crashIndex]); + assertEquals(pulsarServices[crashIndex].getConfiguration().getBrokerServicePort().get(), + new Integer(brokerNativeBrokerPorts[crashIndex])); } catch (PulsarServerException e) { e.printStackTrace(); fail("The broker should be able to start without exception"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index a6eccfc..7a65089 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -140,7 +140,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 0b04ba3..3f90392 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -835,7 +835,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { String namespaceRegex = "other/use/other.*"; String cluster = "use"; String brokerName = pulsar.getAdvertisedAddress(); - String brokerAddress = brokerName + ":" + pulsar.getConfiguration().getWebServicePort(); + String brokerAddress = brokerName + ":" + pulsar.getConfiguration().getWebServicePort().get(); NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData(); nsPolicyData1.namespaces = new ArrayList<String>(); nsPolicyData1.namespaces.add(namespaceRegex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java index ca1817a..b4e2a91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java @@ -61,7 +61,6 @@ public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(getTLSFile("broker.cert")); conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8")); conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java index 2cfb009..e478631 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java @@ -83,7 +83,6 @@ public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest { private void buildConf(ServiceConfiguration conf) { conf.setLoadBalancerEnabled(true); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(getTLSFile("broker.cert")); conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8")); conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 744eb7b..8fb174d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -139,7 +139,6 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setLoadBalancerEnabled(true); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 8088637..befc46d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -174,7 +174,7 @@ public class AntiAffinityNamespaceGroupTest { private void createCluster(ZooKeeper zk, ServiceConfiguration config) throws Exception { ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" + config.getClusterName(), ObjectMapperFactory.getThreadLocal().writeValueAsBytes( - new ClusterData("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort())), + new ClusterData("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort().get())), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index b90fa6f..a959dee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -148,7 +148,7 @@ public class LoadBalancerTest { pulsarServices[i].start(); brokerUrls[i] = new URL("http://127.0.0.1" + ":" + brokerWebServicePorts[i]); - lookupAddresses[i] = pulsarServices[i].getAdvertisedAddress() + ":" + config.getWebServicePort(); + lookupAddresses[i] = pulsarServices[i].getAdvertisedAddress() + ":" + config.getWebServicePort().get(); pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(brokerUrls[i].toString()).build(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 78b591d..aaa6079 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -251,7 +251,7 @@ public class SimpleLoadManagerImplTest { rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024)); ResourceUnit ru1 = new SimpleResourceUnit( - "http://" + pulsar1.getAdvertisedAddress() + ":" + pulsar1.getConfiguration().getWebServicePort(), rd); + "http://" + pulsar1.getAdvertisedAddress() + ":" + pulsar1.getConfiguration().getWebServicePort().get(), rd); Set<ResourceUnit> rus = new HashSet<ResourceUnit>(); rus.add(ru1); LoadRanker lr = new ResourceAvailabilityRanker(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index dfca1b2..1c11f00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -93,7 +93,8 @@ public class OwnershipCacheTest { doReturn(localCache).when(pulsar).getLocalZkCacheService(); doReturn(config).when(pulsar).getConfiguration(); doReturn(nsService).when(pulsar).getNamespaceService(); - doReturn(port).when(config).getBrokerServicePort(); + doReturn(Optional.ofNullable(new Integer(port))).when(config).getBrokerServicePort(); + doReturn(Optional.ofNullable(null)).when(config).getWebServicePort(); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(webAddress(config)).when(pulsar).getWebServiceAddress(); doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 54d1644..4e9f4e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -408,7 +408,6 @@ public class BrokerServiceTest extends BrokerTestBase { PulsarClient pulsarClient = null; conf.setAuthenticationEnabled(false); - conf.setTlsEnabled(false); restartBroker(); // Case 1: Access without TLS @@ -447,7 +446,6 @@ public class BrokerServiceTest extends BrokerTestBase { final String subName = "newSub"; conf.setAuthenticationEnabled(false); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); restartBroker(); @@ -525,7 +523,6 @@ public class BrokerServiceTest extends BrokerTestBase { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(true); @@ -584,7 +581,6 @@ public class BrokerServiceTest extends BrokerTestBase { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(false); @@ -642,7 +638,6 @@ public class BrokerServiceTest extends BrokerTestBase { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders(providers); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index b65b13f..9895e7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1165,7 +1165,7 @@ public class PersistentTopicTest { ConcurrentOpenHashMap<String, Replicator> replicatorMap = topic.getReplicators(); final URL brokerUrl = new URL( - "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); + "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort().get()); PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build(); ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); @@ -1209,7 +1209,7 @@ public class PersistentTopicTest { PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); final URL brokerUrl = new URL( - "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); + "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort().get()); PulsarClient client = spy(PulsarClient.builder().serviceUrl(brokerUrl.toString()).build()); PulsarClientImpl clientImpl = (PulsarClientImpl) client; doReturn(new CompletableFuture<Producer>()).when(clientImpl) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 10754b9..8dd10f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -125,7 +125,6 @@ public class ReplicatorTestBase { inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config1.setBrokerServicePort(PortManager.nextFreePort()); config1.setBrokerServicePortTls(PortManager.nextFreePort()); - config1.setTlsEnabled(true); config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); @@ -159,7 +158,6 @@ public class ReplicatorTestBase { inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config2.setBrokerServicePort(PortManager.nextFreePort()); config2.setBrokerServicePortTls(PortManager.nextFreePort()); - config2.setTlsEnabled(true); config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java index cdc7c06..00b69a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java @@ -124,7 +124,6 @@ public class V1_ReplicatorTestBase { inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config1.setBrokerServicePort(PortManager.nextFreePort()); config1.setBrokerServicePortTls(PortManager.nextFreePort()); - config1.setTlsEnabled(true); config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); @@ -158,7 +157,6 @@ public class V1_ReplicatorTestBase { inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config2.setBrokerServicePort(PortManager.nextFreePort()); config2.setBrokerServicePortTls(PortManager.nextFreePort()); - config2.setTlsEnabled(true); config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 30e4554..38cf4cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -243,13 +243,14 @@ public class WebServiceTest { ServiceConfiguration config = new ServiceConfiguration(); config.setAdvertisedAddress("localhost"); config.setWebServicePort(BROKER_WEBSERVICE_PORT); - config.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + if (enableTls) { + config.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); + } config.setClientLibraryVersionCheckEnabled(enableFilter); config.setAuthenticationEnabled(enableAuth); config.setAuthenticationProviders(providers); config.setAuthorizationEnabled(false); config.setSuperUserRoles(roles); - config.setTlsEnabled(enableTls); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(allowInsecure); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index d2f6a8d..4923894 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -71,7 +71,6 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(true); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java index fef021e..dbd893a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java @@ -72,7 +72,6 @@ public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerB conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(true); conf.setTlsAllowInsecureConnection(true); Set<String> superUserRoles = new HashSet<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 7cf4843..54b10ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -169,7 +169,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { /**** started broker-2 ****/ - URI brokerServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort()); + URI brokerServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort().get()); PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build(); // load namespace-bundle by calling Broker2 @@ -228,7 +228,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(newCluster); // Broker2 serves newCluster conf2.setZookeeperServers("localhost:2181"); - String broker2ServiceUrl = "pulsar://localhost:" + conf2.getBrokerServicePort(); + String broker2ServiceUrl = "pulsar://localhost:" + conf2.getBrokerServicePort().get(); admin.clusters().createCluster(newCluster, new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT, null, broker2ServiceUrl, null)); @@ -394,7 +394,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { conf2.setWebServicePortTls(PortManager.nextFreePort()); conf2.setAdvertisedAddress("localhost"); conf2.setTlsAllowInsecureConnection(true); - conf2.setTlsEnabled(true); conf2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf2.setClusterName(conf.getClusterName()); @@ -403,7 +402,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { // restart broker1 with tls enabled conf.setTlsAllowInsecureConnection(true); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); stopBroker(); @@ -429,7 +427,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { /**** started broker-2 ****/ - URI brokerServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort()); + URI brokerServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort().get()); PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build(); final String lookupResourceUrl = "/lookup/v2/topic/persistent/my-property/my-ns/my-topic1"; @@ -455,11 +453,11 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { con.connect(); log.info("connected url: {} ", con.getURL()); // assert connect-url: broker2-https - assertEquals(con.getURL().getPort(), conf2.getWebServicePortTls()); + assertEquals(new Integer(con.getURL().getPort()), conf2.getWebServicePortTls().get()); InputStream is = con.getInputStream(); // assert redirect-url: broker1-https only log.info("redirected url: {}", con.getURL()); - assertEquals(con.getURL().getPort(), conf.getWebServicePortTls()); + assertEquals(new Integer(con.getURL().getPort()), conf.getWebServicePortTls().get()); is.close(); pulsarClient2.close(); @@ -532,7 +530,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { // (1) restart broker1 with tls enabled conf.setTlsAllowInsecureConnection(true); - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); stopBroker(); @@ -542,7 +539,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { ServiceConfig config = new ServiceConfig(); config.setServicePort(nextFreePort()); config.setServicePortTls(nextFreePort()); - config.setTlsEnabled(true); config.setBindOnLocalhost(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -836,7 +832,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class)); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); - URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort()); + URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort().get()); PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(broker2ServiceUrl.toString()).build(); // (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch @@ -946,7 +942,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class)); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); - URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort()); + URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort().get()); PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(broker2ServiceUrl.toString()).build(); // (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java index 4b73b9e..73a51ad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java @@ -102,6 +102,8 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase { PulsarService pulsarService1 = pulsar; conf.setBrokerServicePort(PortManager.nextFreePort()); conf.setWebServicePort(PortManager.nextFreePort()); + conf.setBrokerServicePortTls(PortManager.nextFreePort()); + conf.setWebServicePortTls(PortManager.nextFreePort()); startBroker(); PulsarService pulsarService2 = pulsar; System.out.println("Pulsar1=" + pulsarService1.getBrokerServiceUrl() + ", Pulsar2=" + pulsarService2.getBrokerServiceUrl()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java index 8e40011..5d966cb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java @@ -58,7 +58,6 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase { } protected void internalSetUpForBroker() throws Exception { - conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 4bb4fb9..fd51c06 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -56,7 +56,8 @@ public class ServiceConfigurationTest { InputStream newStream = updateProp(zookeeperServer, String.valueOf(brokerServicePort), "ns1,ns2"); final ServiceConfiguration config = PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class); assertTrue(isNotBlank(config.getZookeeperServers())); - assertTrue(config.getBrokerServicePort() == brokerServicePort); + assertTrue(config.getBrokerServicePort().isPresent() + && config.getBrokerServicePort().get().equals(brokerServicePort)); assertEquals(config.getBootstrapNamespaces().get(1), "ns2"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 5210491..5cdf5a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -148,8 +148,8 @@ public class PulsarWorkerAssignmentTest { org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); // worker talks to local broker - workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort()); - workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort()); + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get()); + workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get()); workerConfig.setFailureCheckFreqMs(100); workerConfig.setNumFunctionPackageReplicas(1); workerConfig.setClusterCoordinationTopicName("coordinate"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index a2a5b28..b9c2162 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -117,7 +117,6 @@ public class PulsarFunctionAdminTest { providers.add(AuthenticationProviderTls.class.getName()); config.setAuthenticationEnabled(true); config.setAuthenticationProviders(providers); - config.setTlsEnabled(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(true); @@ -180,8 +179,8 @@ public class PulsarFunctionAdminTest { org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); // worker talks to local broker - workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls()); - workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls()); + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()); + workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get()); workerConfig.setFailureCheckFreqMs(100); workerConfig.setNumFunctionPackageReplicas(1); workerConfig.setClusterCoordinationTopicName("coordinate"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 1980bce..7841453 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -149,7 +149,6 @@ public class PulsarFunctionE2ETest { providers.add(AuthenticationProviderTls.class.getName()); config.setAuthenticationEnabled(true); config.setAuthenticationProviders(providers); - config.setTlsEnabled(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(true); @@ -212,8 +211,8 @@ public class PulsarFunctionE2ETest { org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); // worker talks to local broker - workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls()); - workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls()); + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()); + workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get()); workerConfig.setFailureCheckFreqMs(100); workerConfig.setNumFunctionPackageReplicas(1); workerConfig.setClusterCoordinationTopicName("coordinate"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index fb081e5..a37786f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -106,7 +106,6 @@ public class PulsarFunctionTlsTest { providers.add(AuthenticationProviderTls.class.getName()); config.setAuthenticationEnabled(true); config.setAuthenticationProviders(providers); - config.setTlsEnabled(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(true); @@ -166,8 +165,8 @@ public class PulsarFunctionTlsTest { org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); // worker talks to local broker - workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls()); - workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls()); + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get()); + workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePort().get()); workerConfig.setFailureCheckFreqMs(100); workerConfig.setNumFunctionPackageReplicas(1); workerConfig.setClusterCoordinationTopicName("coordinate"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 9ddf955..6f08cc8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -407,7 +407,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); final String baseUrl = pulsar.getWebServiceAddress() - .replace(Integer.toString(pulsar.getConfiguration().getWebServicePort()), (Integer.toString(port))) + .replace(Integer.toString(pulsar.getConfiguration().getWebServicePort().get()), (Integer.toString(port))) + "/admin/v2/proxy-stats/"; // verify proxy metrics diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index 650e05e..96f3c75 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -67,7 +67,7 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase { WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); config.setWebServicePort(port); config.setWebServicePortTls(tlsPort); - config.setTlsEnabled(true); + config.setBrokerClientTlsEnabled(true); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java index c02baaa..ee8531d 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.discovery.service; +import com.google.common.base.Preconditions; import static com.google.common.base.Preconditions.checkNotNull; import java.io.Closeable; @@ -104,15 +105,23 @@ public class DiscoveryService implements Closeable { bootstrap.childHandler(new ServiceChannelInitializer(this, config, false)); // Bind and start to accept incoming connections. - bootstrap.bind(config.getServicePort()).sync(); - LOG.info("Started Pulsar Discovery service on port {}", config.getServicePort()); - - if (config.isTlsEnabled()) { + + Preconditions.checkArgument(config.getServicePort().isPresent() || config.getServicePortTls().isPresent(), + "Either ServicePort or ServicePortTls should be configured."); + + if (config.getServicePort().isPresent()) { + // Bind and start to accept incoming connections. + bootstrap.bind(config.getServicePort().get()).sync(); + LOG.info("Started Pulsar Discovery service on port {}", config.getServicePort()); + } + + if (config.getServicePortTls().isPresent()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new ServiceChannelInitializer(this, config, true)); - tlsBootstrap.bind(config.getServicePortTls()).sync(); - LOG.info("Started Pulsar Discovery TLS service on port {}", config.getServicePortTls()); + tlsBootstrap.bind(config.getServicePortTls().get()).sync(); + LOG.info("Started Pulsar Discovery TLS service on port {}", config.getServicePortTls().get()); } + } public ZooKeeperClientFactory getZooKeeperClientFactory() { @@ -153,15 +162,20 @@ public class DiscoveryService implements Closeable { } public String serviceUrl() { - return new StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePort()).toString(); + if (config.getServicePort().isPresent()) { + return new StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePort().get()) + .toString(); + } else { + return null; + } } public String serviceUrlTls() { - if (config.isTlsEnabled()) { - return new StringBuilder("pulsar+ssl://").append(host()).append(":").append(config.getServicePortTls()) + if (config.getServicePortTls().isPresent()) { + return new StringBuilder("pulsar+ssl://").append(host()).append(":").append(config.getServicePortTls().get()) .toString(); } else { - return ""; + return null; } } diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java index c87cd7d..4cd3239 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java @@ -53,21 +53,21 @@ public class ServerManager { private final Server server; private final ExecutorThreadPool webServiceExecutor; private final List<Handler> handlers = Lists.newArrayList(); - protected final int externalServicePort; public ServerManager(ServiceConfig config) { this.webServiceExecutor = new ExecutorThreadPool(); this.webServiceExecutor.setName("pulsar-external-web"); this.server = new Server(webServiceExecutor); - this.externalServicePort = config.getWebServicePort(); List<ServerConnector> connectors = Lists.newArrayList(); - ServerConnector connector = new ServerConnector(server, 1, 1); - connector.setPort(externalServicePort); - connectors.add(connector); + if (config.getWebServicePort().isPresent()) { + ServerConnector connector = new ServerConnector(server, 1, 1); + connector.setPort(config.getWebServicePort().get()); + connectors.add(connector); + } - if (config.isTlsEnabled()) { + if (config.getWebServicePortTls().isPresent()) { try { SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory( config.isTlsAllowInsecureConnection(), @@ -76,7 +76,7 @@ public class ServerManager { config.getTlsKeyFilePath(), config.getTlsRequireTrustedClientCertOnConnect()); ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory); - tlsConnector.setPort(config.getWebServicePortTls()); + tlsConnector.setPort(config.getWebServicePortTls().get()); connectors.add(tlsConnector); } catch (GeneralSecurityException e) { throw new RestException(e); @@ -102,10 +102,6 @@ public class ServerManager { handlers.add(context); } - public int getExternalServicePort() { - return externalServicePort; - } - public void start() throws Exception { RequestLogHandler requestLogHandler = new RequestLogHandler(); Slf4jRequestLog requestLog = new Slf4jRequestLog(); diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java index c024c19..bc7900e 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.discovery.service.server; +import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -45,13 +46,13 @@ public class ServiceConfig implements PulsarConfiguration { private int zookeeperSessionTimeoutMs = 30_000; // Port to use to server binary-proto request - private int servicePort = 5000; + private Integer servicePort = 5000; // Port to use to server binary-proto-tls request - private int servicePortTls = 5001; + private Integer servicePortTls; // Port to use to server HTTP request - private int webServicePort = 8080; + private Integer webServicePort = 8080; // Port to use to server HTTPS request - private int webServicePortTls = 8443; + private Integer webServicePortTls; // Control whether to bind directly on localhost rather than on normal // hostname private boolean bindOnLocalhost = false; @@ -75,7 +76,7 @@ public class ServiceConfig implements PulsarConfiguration { private String authorizationProvider = PulsarAuthorizationProvider.class.getName(); /***** --- TLS --- ****/ - // Enable TLS + @Deprecated private boolean tlsEnabled = false; // Path for the TLS certificate file private String tlsCertificateFilePath; @@ -131,42 +132,44 @@ public class ServiceConfig implements PulsarConfiguration { this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs; } - public int getServicePort() { - return servicePort; + public Optional<Integer> getServicePort() { + return Optional.ofNullable(servicePort); } public void setServicePort(int servicePort) { this.servicePort = servicePort; } - public int getServicePortTls() { - return servicePortTls; + public Optional<Integer> getServicePortTls() { + return Optional.ofNullable(servicePortTls); } public void setServicePortTls(int servicePortTls) { this.servicePortTls = servicePortTls; } - public int getWebServicePort() { - return webServicePort; + public Optional<Integer> getWebServicePort() { + return Optional.ofNullable(webServicePort); } public void setWebServicePort(int webServicePort) { this.webServicePort = webServicePort; } - public int getWebServicePortTls() { - return webServicePortTls; + public Optional<Integer> getWebServicePortTls() { + return Optional.ofNullable(webServicePortTls); } public void setWebServicePortTls(int webServicePortTls) { this.webServicePortTls = webServicePortTls; } + @Deprecated public boolean isTlsEnabled() { - return tlsEnabled; + return tlsEnabled || webServicePortTls != null || servicePortTls != null; } + @Deprecated public void setTlsEnabled(boolean tlsEnabled) { this.tlsEnabled = tlsEnabled; } diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java index 540d3b8..88e4a7f 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java @@ -53,7 +53,6 @@ public class BaseDiscoveryTestSetup { config.setServicePortTls(nextFreePort()); config.setBindOnLocalhost(true); - config.setTlsEnabled(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java index 648f574..452daaa 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java @@ -194,7 +194,6 @@ public class DiscoveryServiceWebTest extends BaseZKStarterTest{ ServiceConfig config = new ServiceConfig(); config.setWebServicePort(port); config.setWebServicePortTls(tlsPort); - config.setTlsEnabled(true); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); ServerManager server = new ServerManager(config); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 0cc214c..d49722f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -55,8 +55,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private String workerId; private String workerHostname; - private int workerPort; - private int workerPortTls; + private Integer workerPort; + private Integer workerPortTls; private String connectorsDirectory = "./connectors"; private String functionMetadataTopicName; private String functionWebServiceUrl; @@ -80,7 +80,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { // Frequency how often worker performs compaction on function-topics private long topicCompactionFrequencySec = 30 * 60; // 30 minutes /***** --- TLS --- ****/ - // Enable TLS + @Deprecated private boolean tlsEnabled = false; // Path for the TLS certificate file private String tlsCertificateFilePath; @@ -91,6 +91,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { // Accept untrusted TLS certificate from client private boolean tlsAllowInsecureConnection = false; private boolean tlsRequireTrustedClientCertOnConnect = false; + // TLS for Functions -> Broker private boolean useTls = false; private boolean tlsHostnameVerificationEnable = false; // Enforce authentication @@ -104,6 +105,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private Properties properties = new Properties(); + public boolean getTlsEnabled() { + return tlsEnabled || workerPortTls != null; + } + + @Data @Setter @Getter diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 2a3e04c..0e28037 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -115,7 +115,7 @@ public class WorkerServer { handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler(), requestLogHandler }); server.setHandler(handlerCollection); - if (this.workerConfig.isTlsEnabled()) { + if (this.workerConfig.getWorkerPortTls() != null) { try { SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory( this.workerConfig.isTlsAllowInsecureConnection(), this.workerConfig.getTlsTrustCertsFilePath(), diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java index 4c6d249..1d60aa4 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java @@ -45,7 +45,7 @@ public class WorkerApiV2ResourceConfigTest { assertEquals("sample/standalone/functions", wc.getPulsarFunctionsNamespace()); assertEquals(3, wc.getNumFunctionPackageReplicas()); assertEquals(TEST_NAME + "-worker", wc.getWorkerId()); - assertEquals(1234, wc.getWorkerPort()); + assertEquals(new Integer(1234), wc.getWorkerPort()); } @Test @@ -57,7 +57,7 @@ public class WorkerApiV2ResourceConfigTest { assertEquals("test-function-metadata-topic", wc.getFunctionMetadataTopicName()); assertEquals(3, wc.getNumFunctionPackageReplicas()); assertEquals("test-worker", wc.getWorkerId()); - assertEquals(7654, wc.getWorkerPort()); + assertEquals(new Integer(7654), wc.getWorkerPort()); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 45eba27..717019b 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -23,6 +23,7 @@ import com.google.common.collect.Sets; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -129,23 +130,23 @@ public class ProxyConfiguration implements PulsarConfiguration { category = CATEGORY_SERVER, doc = "The port for serving binary protobuf request" ) - private int servicePort = 6650; + private Integer servicePort = 6650; @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving tls secured binary protobuf request" ) - private int servicePortTls = 6651; + private Integer servicePortTls; @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving http requests" ) - private int webServicePort = 8080; + private Integer webServicePort = 8080; @FieldContext( category = CATEGORY_SERVER, doc = "The port for serving https requests" ) - private int webServicePortTls = 8443; + private Integer webServicePortTls; @FieldContext( category = CATEGORY_SERVER, @@ -225,10 +226,8 @@ public class ProxyConfiguration implements PulsarConfiguration { private boolean tlsEnabledWithBroker = false; /***** --- TLS --- ****/ - @FieldContext( - category = CATEGORY_TLS, - doc = "Whether TLS is enabled for the proxy" - ) + + @Deprecated private boolean tlsEnabledInProxy = false; @FieldContext( @@ -332,6 +331,22 @@ public class ProxyConfiguration implements PulsarConfiguration { return properties; } + public Optional<Integer> getServicePort() { + return Optional.ofNullable(servicePort); + } + + public Optional<Integer> getServicePortTls() { + return Optional.ofNullable(servicePortTls); + } + + public Optional<Integer> getWebServicePort() { + return Optional.ofNullable(webServicePort); + } + + public Optional<Integer> getWebServicePortTls() { + return Optional.ofNullable(webServicePortTls); + } + public void setProperties(Properties properties) { this.properties = properties; @@ -387,4 +402,4 @@ public class ProxyConfiguration implements PulsarConfiguration { return String.format("HttpReverseProxyConfig(%s, path=%s, proxyTo=%s)", name, path, proxyTo); } } -} +} \ No newline at end of file diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index eaf5f04..317daa3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -104,8 +104,17 @@ public class ProxyService implements Closeable { } catch (UnknownHostException e) { throw new RuntimeException(e); } - this.serviceUrl = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort()); - this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls()); + if (proxyConfig.getServicePort().isPresent()) { + this.serviceUrl = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort().get()); + } else { + this.serviceUrl = null; + } + + if (proxyConfig.getServicePortTls().isPresent()) { + this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls().get()); + } else { + this.serviceUrlTls = null; + } this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory); this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory); @@ -132,18 +141,21 @@ public class ProxyService implements Closeable { bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false)); // Bind and start to accept incoming connections. - try { - bootstrap.bind(proxyConfig.getServicePort()).sync(); - } catch (Exception e) { - throw new IOException("Failed to bind Pulsar Proxy on port " + proxyConfig.getServicePort(), e); + if (proxyConfig.getServicePort().isPresent()) { + try { + bootstrap.bind(proxyConfig.getServicePort().get()).sync(); + LOG.info("Started Pulsar Proxy at {}", serviceUrl); + } catch (Exception e) { + throw new IOException("Failed to bind Pulsar Proxy on port " + proxyConfig.getServicePort().get(), e); + } } LOG.info("Started Pulsar Proxy at {}", serviceUrl); - if (proxyConfig.isTlsEnabledInProxy()) { + if (proxyConfig.getServicePortTls().isPresent()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true)); - tlsBootstrap.bind(proxyConfig.getServicePortTls()).sync(); - LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getServicePortTls()); + tlsBootstrap.bind(proxyConfig.getServicePortTls().get()).sync(); + LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getServicePortTls().get()); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index 07d7574..20f6b56 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -82,7 +82,6 @@ public class WebServer { this.webServiceExecutor = new ExecutorThreadPool(); this.webServiceExecutor.setName("pulsar-external-web"); this.server = new Server(webServiceExecutor); - this.externalServicePort = config.getWebServicePort(); this.authenticationService = authenticationService; this.config = config; @@ -91,11 +90,13 @@ public class WebServer { HttpConfiguration http_config = new HttpConfiguration(); http_config.setOutputBufferSize(config.getHttpOutputBufferSize()); - ServerConnector connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config)); - connector.setPort(externalServicePort); - connectors.add(connector); - - if (config.isTlsEnabledInProxy()) { + if (config.getWebServicePort().isPresent()) { + this.externalServicePort = config.getWebServicePort().get(); + ServerConnector connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(http_config)); + connector.setPort(externalServicePort); + connectors.add(connector); + } + if (config.getWebServicePortTls().isPresent()) { try { SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory( config.isTlsAllowInsecureConnection(), @@ -104,7 +105,7 @@ public class WebServer { config.getTlsKeyFilePath(), config.isTlsRequireTrustedClientCertOnConnect()); ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory); - tlsConnector.setPort(config.getWebServicePortTls()); + tlsConnector.setPort(config.getWebServicePortTls().get()); connectors.add(tlsConnector); } catch (GeneralSecurityException e) { throw new RuntimeException(e); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index 7c638f4..096ff05 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -62,7 +62,6 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { // enable tls and auth&auth at broker conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(true); conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert")); conf.setTlsCertificateFilePath(getTlsFile("broker.cert")); conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8")); @@ -80,7 +79,6 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -130,7 +128,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { PulsarAdmin getAdminClient(String user) throws Exception { return PulsarAdmin.builder() - .serviceHttpUrl("https://localhost:" + proxyConfig.getWebServicePortTls()) + .serviceHttpUrl("https://localhost:" + proxyConfig.getWebServicePortTls().get()) .tlsTrustCertsFilePath(getTlsFile("ca.cert")) .allowTlsInsecureConnection(false) .authentication(AuthenticationTls.class.getName(), diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index c31773f..50cd920 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -74,7 +74,6 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(true); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -105,7 +104,6 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -154,7 +152,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase public void testTlsSyncProducerAndConsumer() throws Exception { log.info("-- Starting {} test --", methodName); - final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); + final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls().get(); Map<String, String> authParams = Maps.newHashMap(); authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 6a3a799..694eb75 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -162,7 +162,6 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { servicePort = PortManager.nextFreePort(); conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(false); conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); // Expires after an hour conf.setBrokerClientAuthenticationParameters( diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 20ebeb1..4813615 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -70,12 +70,12 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { @Test public void testInboundConnection() throws Exception { LOG.info("Creating producer 1"); - PulsarClient client1 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) + PulsarClient client1 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .build(); Producer<byte[]> producer1 = client1.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create(); LOG.info("Creating producer 2"); - PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) + PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .build(); Producer<byte[]> producer2; Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 42915ec..4997de0 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -54,7 +54,6 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { servicePort = PortManager.nextFreePort(); conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(false); conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); conf.setBrokerClientAuthenticationParameters("authParam:broker"); conf.setAuthenticateOriginalAuthData(true); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 35dfb94..9c8ea85 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -70,7 +70,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { @Test public void testLookup() throws Exception { - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .connectionsPerBroker(5).ioThreads(5).build(); assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire()); assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 69f4f5a..7d93ca6 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -152,7 +152,6 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase { servicePort = PortManager.nextFreePort(); conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(false); conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); conf.setBrokerClientAuthenticationParameters("authParam:broker"); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 09fdbdb..be47d9e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -91,7 +91,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { @Test public void testProducer() throws Exception { - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .build(); Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic") .create(); @@ -105,7 +105,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { @Test public void testProducerConsumer() throws Exception { - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .build(); Producer<byte[]> producer = client.newProducer(Schema.BYTES) .topic("persistent://sample/test/local/producer-consumer-topic") @@ -137,7 +137,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { @Test public void testPartitions() throws Exception { admin.tenants().createTenant("sample", new TenantInfo()); - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .build(); admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2); @@ -164,7 +164,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { @Test public void testRegexSubscription() throws Exception { - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .connectionsPerBroker(5).ioThreads(5).build(); // create two topics by subscribing to a topic and closing it @@ -207,7 +207,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { @Test private void testProtocolVersionAdvertisement() throws Exception { - final String url = "pulsar://localhost:" + proxyConfig.getServicePort(); + final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get(); final String topic = "persistent://sample/test/local/protocol-version-advertisement"; final String sub = "my-sub"; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index db5070f..2bf544d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -58,7 +58,6 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(false); proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); @@ -83,7 +82,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { @Test public void testProducer() throws Exception { PulsarClient client = PulsarClient.builder() - .serviceUrl("pulsar+ssl://localhost:" + proxyConfig.getServicePortTls()).enableTls(true) + .serviceUrl("pulsar+ssl://localhost:" + proxyConfig.getServicePortTls().get()).enableTls(true) .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build(); Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/topic").create(); @@ -97,7 +96,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { @Test public void testPartitions() throws Exception { PulsarClient client = PulsarClient.builder() - .serviceUrl("pulsar+ssl://localhost:" + proxyConfig.getServicePortTls()).enableTls(true) + .serviceUrl("pulsar+ssl://localhost:" + proxyConfig.getServicePortTls().get()).enableTls(true) .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build(); admin.tenants().createTenant("sample", new TenantInfo()); admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index 5ee0dac..1fb9cb4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -78,7 +78,6 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(true); conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH); @@ -110,7 +109,6 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -158,7 +156,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { log.info("-- Starting {} test --", methodName); createAdminClient(); - final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); + final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls().get(); // create a client which connects to proxy over tls and pass authData PulsarClient proxyClient = createPulsarClient(proxyServiceUrl); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index d894b08..1f743e2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -142,7 +142,6 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(true); conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH); @@ -174,7 +173,6 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -223,7 +221,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { startProxy(); createAdminClient(); - final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); + final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls().get(); // create a client which connects to proxy over tls and pass authData PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, PulsarClient.builder()); @@ -274,7 +272,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { startProxy(); createAdminClient(); - final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); + final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls().get(); // create a client which connects to proxy over tls and pass authData PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, PulsarClient.builder().enableTlsHostnameVerification(hostnameVerificationEnabled)); @@ -324,7 +322,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { proxyConfig.setTlsHostnameVerificationEnabled(hostnameVerificationEnabled); startProxy(); createAdminClient(); - final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); + final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls().get(); // create a client which connects to proxy over tls and pass authData PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, PulsarClient.builder().operationTimeout(1, TimeUnit.SECONDS)); @@ -385,7 +383,6 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -418,7 +415,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { }, 3, 1000); try { - final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); + final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls().get(); PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, PulsarClient.builder()); Consumer<byte[]> consumer = proxyClient.newConsumer() .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index 4909a89..825982c 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -70,7 +70,6 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(false); - conf.setTlsEnabled(true); conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); @@ -102,7 +101,6 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -149,7 +147,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { public void testDiscoveryService() throws Exception { log.info("-- Starting {} test --", methodName); - final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); + final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls().get(); Map<String, String> authParams = Maps.newHashMap(); authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index 9d5317c..4bca6d4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -64,7 +64,6 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); - conf.setTlsEnabled(true); conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert")); conf.setTlsCertificateFilePath(getTlsFile("broker.cert")); conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8")); @@ -82,7 +81,6 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas proxyConfig.setServicePortTls(PortManager.nextFreePort()); proxyConfig.setWebServicePort(PortManager.nextFreePort()); proxyConfig.setWebServicePortTls(PortManager.nextFreePort()); - proxyConfig.setTlsEnabledInProxy(true); proxyConfig.setTlsEnabledWithBroker(true); // enable tls and auth&auth at proxy @@ -121,7 +119,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas PulsarAdmin getAdminClient(String user) throws Exception { return PulsarAdmin.builder() - .serviceHttpUrl("https://localhost:" + proxyConfig.getWebServicePortTls()) + .serviceHttpUrl("https://localhost:" + proxyConfig.getWebServicePortTls().get()) .tlsTrustCertsFilePath(getTlsFile("ca.cert")) .allowTlsInsecureConnection(false) .authentication(AuthenticationTls.class.getName(), diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index 5085d14..4923005 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.proxy.server; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import java.util.HashSet; @@ -26,7 +25,6 @@ import java.util.List; import java.util.Set; import javax.servlet.http.HttpServletRequest; -import javax.validation.constraints.AssertTrue; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; @@ -102,7 +100,7 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { @Test public void testUnauthenticatedProxy() throws Exception { PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl("http://127.0.0.1:" + proxyConfig.getWebServicePort()) + .serviceHttpUrl("http://127.0.0.1:" + proxyConfig.getWebServicePort().get()) .build(); List<String> activeBrokers = admin.brokers().getActiveBrokers(configClusterName); Assert.assertEquals(activeBrokers.size(), 1); @@ -113,7 +111,7 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { @Test public void testVipStatus() throws Exception { Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); - WebTarget webTarget = client.target("http://127.0.0.1:" + proxyConfig.getWebServicePort()) + WebTarget webTarget = client.target("http://127.0.0.1:" + proxyConfig.getWebServicePort().get()) .path("/status.html"); String response = webTarget.request().get(String.class); Assert.assertEquals(response, "OK"); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 6660cd8..b984f81 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -190,12 +190,12 @@ public class WebSocketService implements Closeable { config.getBrokerClientAuthenticationParameters()); } - if (config.isTlsEnabled()) { - if (isNotBlank(clusterData.getBrokerServiceUrlTls())) { - clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls()); - } else if (isNotBlank(clusterData.getServiceUrlTls())) { - clientBuilder.serviceUrl(clusterData.getServiceUrlTls()); - } + if (config.isBrokerClientTlsEnabled()) { + if (isNotBlank(clusterData.getBrokerServiceUrlTls())) { + clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls()); + } else if (isNotBlank(clusterData.getServiceUrlTls())) { + clientBuilder.serviceUrl(clusterData.getServiceUrlTls()); + } } else if (isNotBlank(clusterData.getBrokerServiceUrl())) { clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl()); } else { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java index c7fd3b1..1470581 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java @@ -65,13 +65,13 @@ public class ProxyServer { this.server = new Server(executorService); List<ServerConnector> connectors = new ArrayList<>(); - ServerConnector connector = new ServerConnector(server); - - connector.setPort(config.getWebServicePort()); - connectors.add(connector); - + if (config.getWebServicePort().isPresent()) { + ServerConnector connector = new ServerConnector(server); + connector.setPort(config.getWebServicePort().get()); + connectors.add(connector); + } // TLS enabled connector - if (config.isTlsEnabled()) { + if (config.getWebServicePortTls().isPresent()) { try { SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory( config.isTlsAllowInsecureConnection(), @@ -80,12 +80,11 @@ public class ProxyServer { config.getTlsKeyFilePath(), config.getTlsRequireTrustedClientCertOnConnect()); ServerConnector tlsConnector = new ServerConnector(server, -1, -1, sslCtxFactory); - tlsConnector.setPort(config.getWebServicePortTls()); + tlsConnector.setPort(config.getWebServicePortTls().get()); connectors.add(tlsConnector); } catch (GeneralSecurityException e) { throw new PulsarServerException(e); } - } // Limit number of concurrent HTTP connections to avoid getting out of @@ -117,7 +116,7 @@ public class ProxyServer { } public void start() throws PulsarServerException { - log.info("Starting web socket proxy at port {}", conf.getWebServicePort()); + log.info("Starting web socket proxy at port {}", conf.getWebServicePort().get()); try { RequestLogHandler requestLogHandler = new RequestLogHandler(); Slf4jRequestLog requestLog = new Slf4jRequestLog(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index d89a9a0..7f714ae 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.websocket.service; +import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -58,9 +59,9 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { private long zooKeeperSessionTimeoutMillis = 30000; // Port to use to server HTTP request - private int webServicePort = 8080; + private Integer webServicePort = 8080; // Port to use to server HTTPS request - private int webServicePortTls = 8443; + private Integer webServicePortTls; // Hostname or IP address the service binds on, default is 0.0.0.0. private String bindAddress; // --- Authentication --- @@ -99,8 +100,10 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { private String anonymousUserRole = null; /***** --- TLS --- ****/ - // Enable TLS + @Deprecated private boolean tlsEnabled = false; + + private boolean brokerClientTlsEnabled = false; // Path for the TLS certificate file private String tlsCertificateFilePath; // Path for the TLS private key file @@ -189,16 +192,16 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis; } - public int getWebServicePort() { - return webServicePort; + public Optional<Integer> getWebServicePort() { + return Optional.ofNullable(webServicePort); } public void setWebServicePort(int webServicePort) { this.webServicePort = webServicePort; } - public int getWebServicePortTls() { - return webServicePortTls; + public Optional<Integer> getWebServicePortTls() { + return Optional.ofNullable(webServicePortTls); } public void setWebServicePortTls(int webServicePortTls) { @@ -317,12 +320,12 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { this.anonymousUserRole = anonymousUserRole; } - public boolean isTlsEnabled() { - return tlsEnabled; + public boolean isBrokerClientTlsEnabled() { + return brokerClientTlsEnabled || tlsEnabled; } - public void setTlsEnabled(boolean tlsEnabled) { - this.tlsEnabled = tlsEnabled; + public void setBrokerClientTlsEnabled(boolean brokerClientTlsEnabled) { + this.brokerClientTlsEnabled = brokerClientTlsEnabled; } public String getTlsCertificateFilePath() { diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java index a6f3845..888ad96 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java @@ -45,7 +45,6 @@ public class LookupProtocolTest { Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup"); lookupField.setAccessible(true); Assert.assertEquals(lookupField.get(testClient).getClass().getName(), "org.apache.pulsar.client.impl.HttpLookupService"); - Assert.assertFalse(testClient.getConfiguration().isUseTls()); service.close(); } @@ -55,7 +54,7 @@ public class LookupProtocolTest { conf.setServiceUrl("http://localhost:8080"); conf.setServiceUrlTls("https://localhost:8443"); conf.setBrokerServiceUrl("pulsar://localhost:6650"); - conf.setTlsEnabled(true); + conf.setBrokerClientTlsEnabled(true); WebSocketService service = new WebSocketService(conf); PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient(); Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup"); @@ -77,7 +76,6 @@ public class LookupProtocolTest { Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup"); lookupField.setAccessible(true); Assert.assertEquals(lookupField.get(testClient).getClass().getName(), "org.apache.pulsar.client.impl.BinaryProtoLookupService"); - Assert.assertFalse(testClient.getConfiguration().isUseTls()); service.close(); } @@ -88,7 +86,7 @@ public class LookupProtocolTest { conf.setServiceUrlTls("https://localhost:8443"); conf.setBrokerServiceUrl("pulsar://localhost:6650"); conf.setBrokerServiceUrlTls("pulsar+ssl://localhost:6651"); - conf.setTlsEnabled(true); + conf.setBrokerClientTlsEnabled(true); WebSocketService service = new WebSocketService(conf); PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient(); Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");