sijie closed pull request #2988: Start Pulsar in TLS Only mode and deprecate
tlsEnabled flag.
URL: https://github.com/apache/pulsar/pull/2988
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.gitignore b/.gitignore
index e43e519a71..f605f1e8ac 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,6 +20,7 @@
pulsar-broker/src/test/resources/pulsar-functions-api-examples.jar
.project
.settings/
.recommenders/
+.factorypath
# Intellij
.idea/
diff --git a/conf/broker.conf b/conf/broker.conf
index 3c1853adc5..199acd3692 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -28,14 +28,14 @@ configurationStoreServers=
# Broker data port
brokerServicePort=6650
-# Broker data port for TLS
-brokerServicePortTls=6651
+# Broker data port for TLS - By default TLS is disabled
+brokerServicePortTls=
# Port to use to server HTTP request
webServicePort=8080
-# Port to use to server HTTPS request
-webServicePortTls=8443
+# Port to use to server HTTPS request - By default TLS is disabled
+webServicePortTls=
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0
@@ -220,7 +220,7 @@ proxyRoles=
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false
-# Enable TLS
+# Deprecated - Use webServicePortTls and brokerServicePortTls instead
tlsEnabled=false
# Path for the TLS certificate file
diff --git a/conf/discovery.conf b/conf/discovery.conf
index b1b6f41017..907f546378 100644
--- a/conf/discovery.conf
+++ b/conf/discovery.conf
@@ -30,13 +30,13 @@ zookeeperSessionTimeoutMs=30000
servicePort=6650
# Port to use to server binary-proto-tls request
-servicePortTls=6651
+servicePortTls=
# Port that discovery service listen on
webServicePort=8080
# Port to use to server HTTPS request
-webServicePortTls=8443
+webServicePortTls=
# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
@@ -65,7 +65,7 @@ superUserRoles=
authorizationAllowWildcardsMatching=false
##### --- TLS --- #####
-# Enable TLS
+# Deprecated - Use servicePortTls and webServicePortTls instead
tlsEnabled=false
# Path for the TLS certificate file
diff --git a/conf/proxy.conf b/conf/proxy.conf
index ffa6c45fad..78dbe00390 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -43,13 +43,13 @@ zookeeperSessionTimeoutMs=30000
servicePort=6650
# The port to use to server binary Protobuf TLS requests
-servicePortTls=6651
+servicePortTls=
# Port that discovery service listen on
webServicePort=8080
# Port to use to server HTTPS request
-webServicePortTls=8443
+webServicePortTls=
# Path for the file used to determine the rotation status for the proxy
instance when responding
# to service discovery health checks
@@ -98,7 +98,7 @@ maxConcurrentLookupRequests=50000
##### --- TLS --- #####
-# Whether TLS is enabled for the proxy
+# Deprecated - use servicePortTls and webServicePortTls instead
tlsEnabledInProxy=false
# Whether TLS is enabled when communicating with Pulsar brokers
diff --git a/conf/websocket.conf b/conf/websocket.conf
index c8144fd3be..ab03e5687d 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -34,7 +34,7 @@ brokerServiceUrlTls=
# Port to use to server HTTP request
webServicePort=8080
# Port to use to server HTTPS request
-webServicePortTls=8443
+webServicePortTls=
# Path for the file used to determine the rotation status for the
proxy-instance when responding
# to service discovery health checks
@@ -79,6 +79,7 @@ authorizationAllowWildcardsMatching=false
superUserRoles=
# Authentication settings of the proxy itself. Used to connect to brokers
+brokerClientTlsEnabled=false;
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=
@@ -88,7 +89,7 @@ anonymousUserRole=
### --- TLS --- ###
-# Enable TLS
+# Deprecated - use webServicePortTls and brokerClientTlsEnabled instead
tlsEnabled=false
# Accept untrusted TLS certificate from client
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 34882bd16c..61a8775dd6 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -52,12 +52,12 @@
// Configuration Store connection string
@FieldContext(required = false)
private String configurationStoreServers;
- private int brokerServicePort = 6650;
- private int brokerServicePortTls = 6651;
+ private Integer brokerServicePort = 6650;
+ private Integer brokerServicePortTls = null;
// Port to use to server HTTP request
- private int webServicePort = 8080;
+ private Integer webServicePort = 8080;
// Port to use to server HTTPS request
- private int webServicePortTls = 8443;
+ private Integer webServicePortTls = null;
// Hostname or IP address the service binds on.
private String bindAddress = "0.0.0.0";
@@ -228,7 +228,7 @@
private int maxConsumersPerSubscription = 0;
/***** --- TLS --- ****/
- // Enable TLS
+ @Deprecated
private boolean tlsEnabled = false;
// Path for the TLS certificate file
private String tlsCertificateFilePath;
@@ -584,5 +584,20 @@ public void setProperties(Properties properties) {
public int getBookkeeperHealthCheckIntervalSec() {
return (int) bookkeeperClientHealthCheckIntervalSeconds;
}
+
+ public Optional<Integer> getBrokerServicePort() {
+ return Optional.ofNullable(brokerServicePort);
+ }
+
+ public Optional<Integer> getBrokerServicePortTls() {
+ return Optional.ofNullable(brokerServicePortTls);
+ }
+
+ public Optional<Integer> getWebServicePort() {
+ return Optional.ofNullable(webServicePort);
+ }
+ public Optional<Integer> getWebServicePortTls() {
+ return Optional.ofNullable(webServicePortTls);
+ }
}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index baa0e7558c..34590863a7 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -67,10 +67,10 @@ public void testConfigurationConverting() throws Exception {
// check whether converting correctly
assertEquals(serviceConfiguration.getZookeeperServers(),
"localhost:2181");
assertEquals(serviceConfiguration.getConfigurationStoreServers(),
"localhost:2184");
- assertEquals(serviceConfiguration.getBrokerServicePort(), 7650);
- assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651);
- assertEquals(serviceConfiguration.getWebServicePort(), 9080);
- assertEquals(serviceConfiguration.getWebServicePortTls(), 9443);
+ assertEquals(serviceConfiguration.getBrokerServicePort().get(), new
Integer(7650));
+ assertEquals(serviceConfiguration.getBrokerServicePortTls().get(), new
Integer(7651));
+ assertEquals(serviceConfiguration.getWebServicePort().get(), new
Integer(9080));
+ assertEquals(serviceConfiguration.getWebServicePortTls().get(), new
Integer(9443));
// check whether exception causes
try {
@@ -112,7 +112,7 @@ public void testPulsarConfiguraitonLoadingStream() throws
Exception {
assertEquals(serviceConfig.getBacklogQuotaDefaultLimitGB(), 18);
assertEquals(serviceConfig.getClusterName(), "usc");
assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(),
"role:my-role");
- assertEquals(serviceConfig.getBrokerServicePort(), 7777);
+ assertEquals(serviceConfig.getBrokerServicePort().get(), new
Integer(7777));
assertEquals(serviceConfig.getManagedLedgerDigestType(),
DigestType.CRC32C);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 0af322facb..1cf1b5f888 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -147,17 +147,17 @@ private static boolean argsContains(String[] args, String
arg) {
boolean useTls = workerConfig.isUseTls();
String localhost = "127.0.0.1";
String pulsarServiceUrl = useTls
- ? PulsarService.brokerUrlTls(localhost,
brokerConfig.getBrokerServicePortTls())
- : PulsarService.brokerUrl(localhost,
brokerConfig.getBrokerServicePort());
+ ? PulsarService.brokerUrlTls(localhost,
brokerConfig.getBrokerServicePortTls().get())
+ : PulsarService.brokerUrl(localhost,
brokerConfig.getBrokerServicePort().get());
String webServiceUrl = useTls
- ? PulsarService.webAddressTls(localhost,
brokerConfig.getWebServicePortTls())
- : PulsarService.webAddress(localhost,
brokerConfig.getWebServicePort());
+ ? PulsarService.webAddressTls(localhost,
brokerConfig.getWebServicePortTls().get())
+ : PulsarService.webAddress(localhost,
brokerConfig.getWebServicePort().get());
workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceUrl);
String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
brokerConfig.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
- workerConfig.setWorkerPort(brokerConfig.getWebServicePort());
+
workerConfig.setWorkerPort(brokerConfig.getWebServicePort().get());
workerConfig.setWorkerId(
"c-" + brokerConfig.getClusterName()
+ "-fw-" + hostname
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index d1f37c9dbd..8711bd84d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -272,8 +272,8 @@ void start() throws Exception {
workerConfig = WorkerConfig.load(this.getFnWorkerConfigFile());
}
// worker talks to local broker
- workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePort());
- workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" +
config.getWebServicePort());
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePort().get());
+ workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" +
config.getWebServicePort().get());
if (!this.isNoStreamStorage()) {
// only set the state storage service url when state is
enabled.
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" +
this.getStreamStoragePort());
@@ -281,7 +281,7 @@ void start() throws Exception {
String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
- workerConfig.setWorkerPort(config.getWebServicePort());
+ workerConfig.setWorkerPort(config.getWebServicePort().get());
workerConfig.setWorkerId(
"c-" + config.getClusterName()
+ "-fw-" + hostname
@@ -294,9 +294,9 @@ void start() throws Exception {
broker.start();
URL webServiceUrl = new URL(
- String.format("http://%s:%d", config.getAdvertisedAddress(),
config.getWebServicePort()));
+ String.format("http://%s:%d", config.getAdvertisedAddress(),
config.getWebServicePort().get()));
final String brokerServiceUrl = String.format("pulsar://%s:%d",
config.getAdvertisedAddress(),
- config.getBrokerServicePort());
+ config.getBrokerServicePort().get());
admin =
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication(
config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()).build();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index 1f1007eec7..6cacaa53de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -49,7 +49,7 @@ public MessagingServiceShutdownHook(PulsarService service) {
public void run() {
if (service.getConfiguration() != null) {
LOG.info("messaging service shutdown hook started, lookup port="
- + service.getConfiguration().getWebServicePort() + ",
broker url=" + service.getBrokerServiceUrl());
+ + service.getConfiguration().getWebServicePort().get() +
", broker url=" + service.getBrokerServiceUrl());
}
ExecutorService executor = Executors.newSingleThreadExecutor(new
DefaultThreadFactory("shutdown-thread"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4f5bd7288f..74e91bee9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -470,7 +470,7 @@ public synchronized void brokerIsAFollowerNow() {
this.startWorkerService();
LOG.info("messaging service is ready, bootstrap service on
port={}, broker url={}, cluster={}, configs={}",
- config.getWebServicePort(), brokerServiceUrl,
config.getClusterName(),
+ config.getWebServicePort().get(), brokerServiceUrl,
config.getClusterName(),
ReflectionToStringBuilder.toString(config));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -855,7 +855,11 @@ public static String
advertisedAddress(ServiceConfiguration config) {
}
public static String brokerUrl(ServiceConfiguration config) {
- return brokerUrl(advertisedAddress(config),
config.getBrokerServicePort());
+ if (config.getBrokerServicePort().isPresent()) {
+ return brokerUrl(advertisedAddress(config),
config.getBrokerServicePort().get());
+ } else {
+ return null;
+ }
}
public static String brokerUrl(String host, int port) {
@@ -863,10 +867,10 @@ public static String brokerUrl(String host, int port) {
}
public static String brokerUrlTls(ServiceConfiguration config) {
- if (config.isTlsEnabled()) {
- return brokerUrlTls(advertisedAddress(config),
config.getBrokerServicePortTls());
+ if (config.getBrokerServicePortTls().isPresent()) {
+ return brokerUrlTls(advertisedAddress(config),
config.getBrokerServicePortTls().get());
} else {
- return "";
+ return null;
}
}
@@ -875,7 +879,11 @@ public static String brokerUrlTls(String host, int port) {
}
public static String webAddress(ServiceConfiguration config) {
- return webAddress(advertisedAddress(config),
config.getWebServicePort());
+ if (config.getWebServicePort().isPresent()) {
+ return webAddress(advertisedAddress(config),
config.getWebServicePort().get());
+ } else {
+ return null;
+ }
}
public static String webAddress(String host, int port) {
@@ -883,10 +891,10 @@ public static String webAddress(String host, int port) {
}
public static String webAddressTls(ServiceConfiguration config) {
- if (config.isTlsEnabled()) {
- return webAddressTls(advertisedAddress(config),
config.getWebServicePortTls());
+ if (config.getWebServicePortTls().isPresent()) {
+ return webAddressTls(advertisedAddress(config),
config.getWebServicePortTls().get());
} else {
- return "";
+ return null;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 79e21d4b90..fed0004ee0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -289,8 +289,8 @@ protected void validateTopicName(String property, String
cluster, String namespa
protected void validateBrokerName(String broker) throws
MalformedURLException {
String brokerUrl = String.format("http://%s", broker);
String brokerUrlTls = String.format("https://%s", broker);
- if (!pulsar().getWebServiceAddress().equals(brokerUrl)
- && !pulsar().getWebServiceAddressTls().equals(brokerUrlTls)) {
+ if (!brokerUrl.equals(pulsar().getWebServiceAddress())
+ && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, "Invalid broker url %s", broker);
String host = parts[0];
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 5773c61ee1..087eb2804d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -54,7 +54,7 @@
@Override
public void initialize(PulsarService pulsar) {
- lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getWebServicePort();
+ lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getWebServicePort().get();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s",
lookupServiceAddress),
new PulsarResourceDescription());
zkClient = pulsar.getZkClient();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 45a31f5c47..997d9bb9d9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -783,7 +783,7 @@ public void start() throws PulsarServerException {
// Register the brokers in zk list
createZPathIfNotExists(zkClient,
LoadManager.LOADBALANCE_BROKERS_ROOT);
- String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ conf.getWebServicePort();
+ String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ conf.getWebServicePort().get();
brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" +
lookupServiceAddress;
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" +
lookupServiceAddress;
updateLocalBrokerData();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index da4c0e3e6d..023ce3b1a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -283,7 +283,7 @@ public void start() throws PulsarServerException {
// ignore the exception, node might be present already
}
}
- String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ conf.getWebServicePort();
+ String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ conf.getWebServicePort().get();
brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" +
lookupServiceAddress;
LoadReport loadReport = null;
try {
@@ -1113,7 +1113,7 @@ private LoadReport generateLoadReportForcefully() throws
Exception {
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
loadReport.setName(String.format("%s:%s",
pulsar.getAdvertisedAddress(),
- pulsar.getConfiguration().getWebServicePort()));
+ pulsar.getConfiguration().getWebServicePort().get()));
loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
SystemResourceUsage systemResourceUsage =
this.getSystemResourceUsage();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index ab8aef7368..9b4fcaef39 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -487,7 +487,7 @@ private boolean isBrokerActive(String candidateBroker)
throws KeeperException, I
if (LOG.isDebugEnabled()) {
LOG.debug("Broker not found for SLA Monitoring Namespace {}",
- candidateBroker + ":" + config.getWebServicePort());
+ candidateBroker + ":" + config.getWebServicePort().get());
}
return false;
}
@@ -972,11 +972,22 @@ public void unloadSLANamespace() throws Exception {
}
public static String getHeartbeatNamespace(String host,
ServiceConfiguration config) {
- return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(),
host, config.getWebServicePort());
+ Integer port = null;
+ if (config.getWebServicePort().isPresent()) {
+ port = config.getWebServicePort().get();
+ } else if (config.getWebServicePortTls().isPresent()) {
+ port = config.getWebServicePortTls().get();
+ }
+ return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(),
host, port);
}
-
- public static String getSLAMonitorNamespace(String host,
ServiceConfiguration config) {
- return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host,
config.getWebServicePort());
+ public static String getSLAMonitorNamespace(String host,
ServiceConfiguration config) {
+ Integer port = null;
+ if (config.getWebServicePort().isPresent()) {
+ port = config.getWebServicePort().get();
+ } else if (config.getWebServicePortTls().isPresent()) {
+ port = config.getWebServicePortTls().get();
+ }
+ return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host,
port);
}
public static String checkHeartbeatNamespace(ServiceUnitId ns) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 16352a4f49..a43e404dba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -135,8 +135,6 @@
private final PulsarService pulsar;
private final ManagedLedgerFactory managedLedgerFactory;
- private final int port;
- private final int tlsPort;
private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics;
@@ -192,8 +190,6 @@
public BrokerService(PulsarService pulsar) throws Exception {
this.pulsar = pulsar;
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
- this.port = new URI(pulsar.getBrokerServiceUrl()).getPort();
- this.tlsPort = new URI(pulsar.getBrokerServiceUrlTls()).getPort();
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
this.keepAliveIntervalSeconds =
pulsar.getConfiguration().getKeepAliveIntervalSeconds();
@@ -293,20 +289,24 @@ public void start() throws Exception {
bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
- // Bind and start to accept incoming connections.
- InetSocketAddress addr = new
InetSocketAddress(pulsar.getBindAddress(), port);
- try {
- bootstrap.bind(addr).sync();
- } catch (Exception e) {
- throw new IOException("Failed to bind Pulsar broker on " + addr,
e);
+ Optional<Integer> port = serviceConfig.getBrokerServicePort();
+ if (port.isPresent()) {
+ // Bind and start to accept incoming connections.
+ InetSocketAddress addr = new
InetSocketAddress(pulsar.getBindAddress(), port.get());
+ try {
+ bootstrap.bind(addr).sync();
+ } catch (Exception e) {
+ throw new IOException("Failed to bind Pulsar broker on " +
addr, e);
+ }
+ log.info("Started Pulsar Broker service on port {}", port.get());
}
- log.info("Started Pulsar Broker service on port {}", port);
-
- if (serviceConfig.isTlsEnabled()) {
+
+ Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
+ if (tlsPort.isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar,
true));
- tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(),
tlsPort)).sync();
- log.info("Started Pulsar Broker TLS service on port {} - TLS
provider: {}", tlsPort,
+ tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(),
tlsPort.get())).sync();
+ log.info("Started Pulsar Broker TLS service on port {} - TLS
provider: {}", tlsPort.get(),
SslContext.defaultServerProvider());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index a9557116e5..81e5a40a31 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -320,7 +320,7 @@ protected void validateClusterOwnership(String cluster)
throws WebApplicationExc
private URI getRedirectionUrl(ClusterData differentClusterData) throws
MalformedURLException {
URL webUrl = null;
- if (isRequestHttps() && pulsar.getConfiguration().isTlsEnabled()
+ if (isRequestHttps() &&
pulsar.getConfiguration().getWebServicePortTls().isPresent()
&&
StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
webUrl = new URL(differentClusterData.getServiceUrlTls());
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 02243b9558..4ca8fcf70a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -27,6 +27,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TimeZone;
import javax.servlet.DispatcherType;
@@ -81,12 +82,16 @@ public WebService(PulsarService pulsar) throws
PulsarServerException {
this.server = new Server(webServiceExecutor);
List<ServerConnector> connectors = new ArrayList<>();
- ServerConnector connector = new PulsarServerConnector(server, 1, 1);
- connector.setPort(pulsar.getConfiguration().getWebServicePort());
- connector.setHost(pulsar.getBindAddress());
- connectors.add(connector);
+ Optional<Integer> port = pulsar.getConfiguration().getWebServicePort();
+ if (port.isPresent()) {
+ ServerConnector connector = new PulsarServerConnector(server, 1,
1);
+ connector.setPort(port.get());
+ connector.setHost(pulsar.getBindAddress());
+ connectors.add(connector);
+ }
- if (pulsar.getConfiguration().isTlsEnabled()) {
+ Optional<Integer> tlsPort =
pulsar.getConfiguration().getWebServicePortTls();
+ if (tlsPort.isPresent()) {
try {
SslContextFactory sslCtxFactory =
SecurityUtility.createSslContextFactory(
pulsar.getConfiguration().isTlsAllowInsecureConnection(),
@@ -95,7 +100,7 @@ public WebService(PulsarService pulsar) throws
PulsarServerException {
pulsar.getConfiguration().getTlsKeyFilePath(),
pulsar.getConfiguration().isTlsRequireTrustedClientCertOnConnect());
ServerConnector tlsConnector = new
PulsarServerConnector(server, 1, 1, sslCtxFactory);
-
tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls());
+ tlsConnector.setPort(tlsPort.get());
tlsConnector.setHost(pulsar.getBindAddress());
connectors.add(tlsConnector);
} catch (GeneralSecurityException e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index c16d6767cd..081662534b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -79,18 +79,23 @@ public static void main(String[] args) throws Exception {
arguments.brokerConfigFile, ServiceConfiguration.class);
}
- String pulsarServiceUrl = PulsarService.brokerUrl(brokerConfig);
ClientBuilder clientBuilder = PulsarClient.builder();
if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
brokerConfig.getBrokerClientAuthenticationParameters());
}
- clientBuilder.serviceUrl(pulsarServiceUrl)
- .enableTls(brokerConfig.isTlsEnabled())
-
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
-
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
+
+ if (brokerConfig.getBrokerServicePortTls().isPresent()) {
+ clientBuilder.serviceUrl(PulsarService.brokerUrlTls(brokerConfig))
+
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
+ .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
+
+ } else {
+ clientBuilder.serviceUrl(PulsarService.brokerUrl(brokerConfig));
+ }
+
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 233d6b7b36..cddf3abb55 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -217,8 +217,8 @@ public void testUnloadIfBrokerCrashes() {
try {
pulsarServices[crashIndex] = new
PulsarService(configurations[crashIndex]);
pulsarServices[crashIndex].start();
-
assertEquals(pulsarServices[crashIndex].getConfiguration().getBrokerServicePort(),
- brokerNativeBrokerPorts[crashIndex]);
+
assertEquals(pulsarServices[crashIndex].getConfiguration().getBrokerServicePort().get(),
+ new Integer(brokerNativeBrokerPorts[crashIndex]));
} catch (PulsarServerException e) {
e.printStackTrace();
fail("The broker should be able to start without exception");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 01160fec8d..f7c7e70359 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -139,7 +139,6 @@
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 0b04ba3c5f..3f90392063 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -835,7 +835,7 @@ public void brokerNamespaceIsolationPolicies() throws
Exception {
String namespaceRegex = "other/use/other.*";
String cluster = "use";
String brokerName = pulsar.getAdvertisedAddress();
- String brokerAddress = brokerName + ":" +
pulsar.getConfiguration().getWebServicePort();
+ String brokerAddress = brokerName + ":" +
pulsar.getConfiguration().getWebServicePort().get();
NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData();
nsPolicyData1.namespaces = new ArrayList<String>();
nsPolicyData1.namespaces.add(namespaceRegex);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
index ca1817a053..b4e2a919ab 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
@@ -61,7 +61,6 @@ private static String getTLSFile(String name) {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
index 2cfb009ac7..e478631513 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -83,7 +83,6 @@ public void setup() throws Exception {
private void buildConf(ServiceConfiguration conf) {
conf.setLoadBalancerEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 803f8af028..8da7698d88 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -138,7 +138,6 @@
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 8088637855..befc46da0d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -174,7 +174,7 @@ void shutdown() throws Exception {
private void createCluster(ZooKeeper zk, ServiceConfiguration config)
throws Exception {
ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" +
config.getClusterName(),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
- new ClusterData("http://" +
config.getAdvertisedAddress() + ":" + config.getWebServicePort())),
+ new ClusterData("http://" +
config.getAdvertisedAddress() + ":" + config.getWebServicePort().get())),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index b90fa6faa2..a959dee75a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -148,7 +148,7 @@ void setup() throws Exception {
pulsarServices[i].start();
brokerUrls[i] = new URL("http://127.0.0.1" + ":" +
brokerWebServicePorts[i]);
- lookupAddresses[i] = pulsarServices[i].getAdvertisedAddress() +
":" + config.getWebServicePort();
+ lookupAddresses[i] = pulsarServices[i].getAdvertisedAddress() +
":" + config.getWebServicePort().get();
pulsarAdmins[i] =
PulsarAdmin.builder().serviceHttpUrl(brokerUrls[i].toString()).build();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 78b591d2e8..aaa60794e7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -251,7 +251,7 @@ public void testPrimary() throws Exception {
rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024));
ResourceUnit ru1 = new SimpleResourceUnit(
- "http://" + pulsar1.getAdvertisedAddress() + ":" +
pulsar1.getConfiguration().getWebServicePort(), rd);
+ "http://" + pulsar1.getAdvertisedAddress() + ":" +
pulsar1.getConfiguration().getWebServicePort().get(), rd);
Set<ResourceUnit> rus = new HashSet<ResourceUnit>();
rus.add(ru1);
LoadRanker lr = new ResourceAvailabilityRanker();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index dfca1b2c06..1c11f00b0d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -93,7 +93,8 @@ public void setup() throws Exception {
doReturn(localCache).when(pulsar).getLocalZkCacheService();
doReturn(config).when(pulsar).getConfiguration();
doReturn(nsService).when(pulsar).getNamespaceService();
- doReturn(port).when(config).getBrokerServicePort();
+ doReturn(Optional.ofNullable(new
Integer(port))).when(config).getBrokerServicePort();
+ doReturn(Optional.ofNullable(null)).when(config).getWebServicePort();
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(webAddress(config)).when(pulsar).getWebServiceAddress();
doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 54d1644975..4e9f4e986f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -408,7 +408,6 @@ public void testTlsDisabled() throws Exception {
PulsarClient pulsarClient = null;
conf.setAuthenticationEnabled(false);
- conf.setTlsEnabled(false);
restartBroker();
// Case 1: Access without TLS
@@ -447,7 +446,6 @@ public void testTlsEnabled() throws Exception {
final String subName = "newSub";
conf.setAuthenticationEnabled(false);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
restartBroker();
@@ -525,7 +523,6 @@ public void testTlsAuthAllowInsecure() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
@@ -584,7 +581,6 @@ public void testTlsAuthDisallowInsecure() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
@@ -642,7 +638,6 @@ public void testTlsAuthUseTrustCert() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b65b13fb44..9895e7bc52 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1165,7 +1165,7 @@ public void testAtomicReplicationRemoval() throws
Exception {
ConcurrentOpenHashMap<String, Replicator> replicatorMap =
topic.getReplicators();
final URL brokerUrl = new URL(
- "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort());
+ "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort().get());
PulsarClient client =
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
@@ -1209,7 +1209,7 @@ public void testClosingReplicationProducerTwice() throws
Exception {
PersistentTopic topic = new PersistentTopic(globalTopicName,
ledgerMock, brokerService);
final URL brokerUrl = new URL(
- "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort());
+ "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort().get());
PulsarClient client =
spy(PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
doReturn(new CompletableFuture<Producer>()).when(clientImpl)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 10754b919e..8dd10f4b94 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -125,7 +125,6 @@ void setup() throws Exception {
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config1.setBrokerServicePort(PortManager.nextFreePort());
config1.setBrokerServicePortTls(PortManager.nextFreePort());
- config1.setTlsEnabled(true);
config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
@@ -159,7 +158,6 @@ void setup() throws Exception {
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config2.setBrokerServicePort(PortManager.nextFreePort());
config2.setBrokerServicePortTls(PortManager.nextFreePort());
- config2.setTlsEnabled(true);
config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
index cdc7c06007..00b69a875a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
@@ -124,7 +124,6 @@ void setup() throws Exception {
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config1.setBrokerServicePort(PortManager.nextFreePort());
config1.setBrokerServicePortTls(PortManager.nextFreePort());
- config1.setTlsEnabled(true);
config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
@@ -158,7 +157,6 @@ void setup() throws Exception {
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config2.setBrokerServicePort(PortManager.nextFreePort());
config2.setBrokerServicePortTls(PortManager.nextFreePort());
- config2.setTlsEnabled(true);
config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 30e455412d..38cf4cc8aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -243,13 +243,14 @@ private void setupEnv(boolean enableFilter, String
minApiVersion, boolean allowU
ServiceConfiguration config = new ServiceConfiguration();
config.setAdvertisedAddress("localhost");
config.setWebServicePort(BROKER_WEBSERVICE_PORT);
- config.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
+ if (enableTls) {
+ config.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
+ }
config.setClientLibraryVersionCheckEnabled(enableFilter);
config.setAuthenticationEnabled(enableAuth);
config.setAuthenticationProviders(providers);
config.setAuthorizationEnabled(false);
config.setSuperUserRoles(roles);
- config.setTlsEnabled(enableTls);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(allowInsecure);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index d2f6a8d057..4923894c66 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -71,7 +71,6 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index fef021e4c7..dbd893a9e1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -72,7 +72,6 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsAllowInsecureConnection(true);
Set<String> superUserRoles = new HashSet<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 7cf4843824..54b10ba7c8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -169,7 +169,7 @@ public void testMultipleBrokerLookup() throws Exception {
/**** started broker-2 ****/
- URI brokerServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort());
+ URI brokerServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort().get());
PulsarClient pulsarClient2 =
PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build();
// load namespace-bundle by calling Broker2
@@ -228,7 +228,7 @@ public void testMultipleBrokerDifferentClusterLookup()
throws Exception {
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(newCluster); // Broker2 serves newCluster
conf2.setZookeeperServers("localhost:2181");
- String broker2ServiceUrl = "pulsar://localhost:" +
conf2.getBrokerServicePort();
+ String broker2ServiceUrl = "pulsar://localhost:" +
conf2.getBrokerServicePort().get();
admin.clusters().createCluster(newCluster,
new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT,
null, broker2ServiceUrl, null));
@@ -394,7 +394,6 @@ public void testWebserviceServiceTls() throws Exception {
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setTlsAllowInsecureConnection(true);
- conf2.setTlsEnabled(true);
conf2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf2.setClusterName(conf.getClusterName());
@@ -403,7 +402,6 @@ public void testWebserviceServiceTls() throws Exception {
// restart broker1 with tls enabled
conf.setTlsAllowInsecureConnection(true);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
stopBroker();
@@ -429,7 +427,7 @@ public void testWebserviceServiceTls() throws Exception {
/**** started broker-2 ****/
- URI brokerServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort());
+ URI brokerServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort().get());
PulsarClient pulsarClient2 =
PulsarClient.builder().serviceUrl(brokerServiceUrl.toString()).build();
final String lookupResourceUrl =
"/lookup/v2/topic/persistent/my-property/my-ns/my-topic1";
@@ -455,11 +453,11 @@ public void testWebserviceServiceTls() throws Exception {
con.connect();
log.info("connected url: {} ", con.getURL());
// assert connect-url: broker2-https
- assertEquals(con.getURL().getPort(), conf2.getWebServicePortTls());
+ assertEquals(new Integer(con.getURL().getPort()),
conf2.getWebServicePortTls().get());
InputStream is = con.getInputStream();
// assert redirect-url: broker1-https only
log.info("redirected url: {}", con.getURL());
- assertEquals(con.getURL().getPort(), conf.getWebServicePortTls());
+ assertEquals(new Integer(con.getURL().getPort()),
conf.getWebServicePortTls().get());
is.close();
pulsarClient2.close();
@@ -532,7 +530,6 @@ public void testDiscoveryLookupTls() throws Exception {
// (1) restart broker1 with tls enabled
conf.setTlsAllowInsecureConnection(true);
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
stopBroker();
@@ -542,7 +539,6 @@ public void testDiscoveryLookupTls() throws Exception {
ServiceConfig config = new ServiceConfig();
config.setServicePort(nextFreePort());
config.setServicePortTls(nextFreePort());
- config.setTlsEnabled(true);
config.setBindOnLocalhost(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
@@ -836,7 +832,7 @@ public void testSplitUnloadLookupTest() throws Exception {
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new
AtomicReference<>(loadManager1));
- URI broker2ServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort());
+ URI broker2ServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort().get());
PulsarClient pulsarClient2 =
PulsarClient.builder().serviceUrl(broker2ServiceUrl.toString()).build();
// (3) Broker-2 receives topic-1 request, creates local-policies and
sets the watch
@@ -946,7 +942,7 @@ public void testModularLoadManagerSplitBundle() throws
Exception {
doReturn(res).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new
AtomicReference<>(loadManager1));
- URI broker2ServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort());
+ URI broker2ServiceUrl = new URI("pulsar://localhost:" +
conf2.getBrokerServicePort().get());
PulsarClient pulsarClient2 =
PulsarClient.builder().serviceUrl(broker2ServiceUrl.toString()).build();
// (3) Broker-2 receives topic-1 request, creates local-policies
and sets the watch
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 4b73b9ef6d..73a51adba9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -102,6 +102,8 @@ public void
testCreateClientWithAutoChangedServiceUrlProvider() throws Exception
PulsarService pulsarService1 = pulsar;
conf.setBrokerServicePort(PortManager.nextFreePort());
conf.setWebServicePort(PortManager.nextFreePort());
+ conf.setBrokerServicePortTls(PortManager.nextFreePort());
+ conf.setWebServicePortTls(PortManager.nextFreePort());
startBroker();
PulsarService pulsarService2 = pulsar;
System.out.println("Pulsar1=" + pulsarService1.getBrokerServiceUrl() +
", Pulsar2=" + pulsarService2.getBrokerServiceUrl());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index 8e40011575..5d966cb6bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -58,7 +58,6 @@ protected void cleanup() throws Exception {
}
protected void internalSetUpForBroker() throws Exception {
- conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 4bb4fb9945..fd51c06702 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -56,7 +56,8 @@ public void testInit() throws Exception {
InputStream newStream = updateProp(zookeeperServer,
String.valueOf(brokerServicePort), "ns1,ns2");
final ServiceConfiguration config =
PulsarConfigurationLoader.create(newStream, ServiceConfiguration.class);
assertTrue(isNotBlank(config.getZookeeperServers()));
- assertTrue(config.getBrokerServicePort() == brokerServicePort);
+ assertTrue(config.getBrokerServicePort().isPresent()
+ &&
config.getBrokerServicePort().get().equals(brokerServicePort));
assertEquals(config.getBootstrapNamespaces().get(1), "ns2");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 5210491eed..5cdf5a446a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -148,8 +148,8 @@ private WorkerService
createPulsarFunctionWorker(ServiceConfiguration config) {
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setThreadContainerFactory(new
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
// worker talks to local broker
- workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePort());
- workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" +
config.getWebServicePort());
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePort().get());
+ workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" +
config.getWebServicePort().get());
workerConfig.setFailureCheckFreqMs(100);
workerConfig.setNumFunctionPackageReplicas(1);
workerConfig.setClusterCoordinationTopicName("coordinate");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index a2a5b283cd..b9c21628fd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -117,7 +117,6 @@ void setup(Method method) throws Exception {
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(providers);
- config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
@@ -180,8 +179,8 @@ private WorkerService
createPulsarFunctionWorker(ServiceConfiguration config) {
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setThreadContainerFactory(new
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
// worker talks to local broker
- workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePortTls());
- workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" +
config.getWebServicePortTls());
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePortTls().get());
+ workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" +
config.getWebServicePortTls().get());
workerConfig.setFailureCheckFreqMs(100);
workerConfig.setNumFunctionPackageReplicas(1);
workerConfig.setClusterCoordinationTopicName("coordinate");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 1980bced1b..7841453c20 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -149,7 +149,6 @@ void setup(Method method) throws Exception {
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(providers);
- config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
@@ -212,8 +211,8 @@ private WorkerService
createPulsarFunctionWorker(ServiceConfiguration config) {
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setThreadContainerFactory(new
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
// worker talks to local broker
- workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePortTls());
- workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" +
config.getWebServicePortTls());
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePortTls().get());
+ workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" +
config.getWebServicePortTls().get());
workerConfig.setFailureCheckFreqMs(100);
workerConfig.setNumFunctionPackageReplicas(1);
workerConfig.setClusterCoordinationTopicName("coordinate");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index fb081e5bf7..a37786ff07 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -106,7 +106,6 @@ void setup(Method method) throws Exception {
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(providers);
- config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
@@ -166,8 +165,8 @@ private WorkerService
createPulsarFunctionWorker(ServiceConfiguration config) {
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setThreadContainerFactory(new
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
// worker talks to local broker
- workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePortTls());
- workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" +
config.getWebServicePortTls());
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePort().get());
+ workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" +
config.getWebServicePort().get());
workerConfig.setFailureCheckFreqMs(100);
workerConfig.setNumFunctionPackageReplicas(1);
workerConfig.setClusterCoordinationTopicName("coordinate");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 9ddf955484..6f08cc8595 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -407,7 +407,7 @@ public void testProxyStats() throws Exception {
Client client = ClientBuilder.newClient(new
ClientConfig().register(LoggingFeature.class));
final String baseUrl = pulsar.getWebServiceAddress()
-
.replace(Integer.toString(pulsar.getConfiguration().getWebServicePort()),
(Integer.toString(port)))
+
.replace(Integer.toString(pulsar.getConfiguration().getWebServicePort().get()),
(Integer.toString(port)))
+ "/admin/v2/proxy-stats/";
// verify proxy metrics
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index 650e05e139..96f3c758e9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -67,7 +67,7 @@ public void setup() throws Exception {
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(port);
config.setWebServicePortTls(tlsPort);
- config.setTlsEnabled(true);
+ config.setBrokerClientTlsEnabled(true);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index c02baaa1ef..ee8531d3b0 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.discovery.service;
+import com.google.common.base.Preconditions;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.Closeable;
@@ -104,15 +105,23 @@ public void startServer() throws Exception {
bootstrap.childHandler(new ServiceChannelInitializer(this, config,
false));
// Bind and start to accept incoming connections.
- bootstrap.bind(config.getServicePort()).sync();
- LOG.info("Started Pulsar Discovery service on port {}",
config.getServicePort());
-
- if (config.isTlsEnabled()) {
+
+ Preconditions.checkArgument(config.getServicePort().isPresent() ||
config.getServicePortTls().isPresent(),
+ "Either ServicePort or ServicePortTls should be configured.");
+
+ if (config.getServicePort().isPresent()) {
+ // Bind and start to accept incoming connections.
+ bootstrap.bind(config.getServicePort().get()).sync();
+ LOG.info("Started Pulsar Discovery service on port {}",
config.getServicePort());
+ }
+
+ if (config.getServicePortTls().isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new ServiceChannelInitializer(this,
config, true));
- tlsBootstrap.bind(config.getServicePortTls()).sync();
- LOG.info("Started Pulsar Discovery TLS service on port {}",
config.getServicePortTls());
+ tlsBootstrap.bind(config.getServicePortTls().get()).sync();
+ LOG.info("Started Pulsar Discovery TLS service on port {}",
config.getServicePortTls().get());
}
+
}
public ZooKeeperClientFactory getZooKeeperClientFactory() {
@@ -153,15 +162,20 @@ public String host() {
}
public String serviceUrl() {
- return new
StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePort()).toString();
+ if (config.getServicePort().isPresent()) {
+ return new
StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePort().get())
+ .toString();
+ } else {
+ return null;
+ }
}
public String serviceUrlTls() {
- if (config.isTlsEnabled()) {
- return new
StringBuilder("pulsar+ssl://").append(host()).append(":").append(config.getServicePortTls())
+ if (config.getServicePortTls().isPresent()) {
+ return new
StringBuilder("pulsar+ssl://").append(host()).append(":").append(config.getServicePortTls().get())
.toString();
} else {
- return "";
+ return null;
}
}
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
index c87cd7da60..4cd32396dc 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
@@ -53,21 +53,21 @@
private final Server server;
private final ExecutorThreadPool webServiceExecutor;
private final List<Handler> handlers = Lists.newArrayList();
- protected final int externalServicePort;
public ServerManager(ServiceConfig config) {
this.webServiceExecutor = new ExecutorThreadPool();
this.webServiceExecutor.setName("pulsar-external-web");
this.server = new Server(webServiceExecutor);
- this.externalServicePort = config.getWebServicePort();
List<ServerConnector> connectors = Lists.newArrayList();
- ServerConnector connector = new ServerConnector(server, 1, 1);
- connector.setPort(externalServicePort);
- connectors.add(connector);
+ if (config.getWebServicePort().isPresent()) {
+ ServerConnector connector = new ServerConnector(server, 1, 1);
+ connector.setPort(config.getWebServicePort().get());
+ connectors.add(connector);
+ }
- if (config.isTlsEnabled()) {
+ if (config.getWebServicePortTls().isPresent()) {
try {
SslContextFactory sslCtxFactory =
SecurityUtility.createSslContextFactory(
config.isTlsAllowInsecureConnection(),
@@ -76,7 +76,7 @@ public ServerManager(ServiceConfig config) {
config.getTlsKeyFilePath(),
config.getTlsRequireTrustedClientCertOnConnect());
ServerConnector tlsConnector = new ServerConnector(server, 1,
1, sslCtxFactory);
- tlsConnector.setPort(config.getWebServicePortTls());
+ tlsConnector.setPort(config.getWebServicePortTls().get());
connectors.add(tlsConnector);
} catch (GeneralSecurityException e) {
throw new RestException(e);
@@ -102,10 +102,6 @@ public void addServlet(String path, Class<? extends
Servlet> servlet, Map<String
handlers.add(context);
}
- public int getExternalServicePort() {
- return externalServicePort;
- }
-
public void start() throws Exception {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index c024c19881..bc7900eb27 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.discovery.service.server;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -45,13 +46,13 @@
private int zookeeperSessionTimeoutMs = 30_000;
// Port to use to server binary-proto request
- private int servicePort = 5000;
+ private Integer servicePort = 5000;
// Port to use to server binary-proto-tls request
- private int servicePortTls = 5001;
+ private Integer servicePortTls;
// Port to use to server HTTP request
- private int webServicePort = 8080;
+ private Integer webServicePort = 8080;
// Port to use to server HTTPS request
- private int webServicePortTls = 8443;
+ private Integer webServicePortTls;
// Control whether to bind directly on localhost rather than on normal
// hostname
private boolean bindOnLocalhost = false;
@@ -75,7 +76,7 @@
private String authorizationProvider =
PulsarAuthorizationProvider.class.getName();
/***** --- TLS --- ****/
- // Enable TLS
+ @Deprecated
private boolean tlsEnabled = false;
// Path for the TLS certificate file
private String tlsCertificateFilePath;
@@ -131,42 +132,44 @@ public void setZookeeperSessionTimeoutMs(int
zookeeperSessionTimeoutMs) {
this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
}
- public int getServicePort() {
- return servicePort;
+ public Optional<Integer> getServicePort() {
+ return Optional.ofNullable(servicePort);
}
public void setServicePort(int servicePort) {
this.servicePort = servicePort;
}
- public int getServicePortTls() {
- return servicePortTls;
+ public Optional<Integer> getServicePortTls() {
+ return Optional.ofNullable(servicePortTls);
}
public void setServicePortTls(int servicePortTls) {
this.servicePortTls = servicePortTls;
}
- public int getWebServicePort() {
- return webServicePort;
+ public Optional<Integer> getWebServicePort() {
+ return Optional.ofNullable(webServicePort);
}
public void setWebServicePort(int webServicePort) {
this.webServicePort = webServicePort;
}
- public int getWebServicePortTls() {
- return webServicePortTls;
+ public Optional<Integer> getWebServicePortTls() {
+ return Optional.ofNullable(webServicePortTls);
}
public void setWebServicePortTls(int webServicePortTls) {
this.webServicePortTls = webServicePortTls;
}
+ @Deprecated
public boolean isTlsEnabled() {
- return tlsEnabled;
+ return tlsEnabled || webServicePortTls != null || servicePortTls !=
null;
}
+ @Deprecated
public void setTlsEnabled(boolean tlsEnabled) {
this.tlsEnabled = tlsEnabled;
}
diff --git
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
index 540d3b826e..88e4a7fe0d 100644
---
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
+++
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
@@ -53,7 +53,6 @@ protected void setup() throws Exception {
config.setServicePortTls(nextFreePort());
config.setBindOnLocalhost(true);
- config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
diff --git
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
index 648f57415a..452daaae88 100644
---
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
+++
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -194,7 +194,6 @@ public void testTlsEnable() throws Exception {
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
config.setWebServicePortTls(tlsPort);
- config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
ServerManager server = new ServerManager(config);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0cc214cc87..d49722f871 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -55,8 +55,8 @@
private String workerId;
private String workerHostname;
- private int workerPort;
- private int workerPortTls;
+ private Integer workerPort;
+ private Integer workerPortTls;
private String connectorsDirectory = "./connectors";
private String functionMetadataTopicName;
private String functionWebServiceUrl;
@@ -80,7 +80,7 @@
// Frequency how often worker performs compaction on function-topics
private long topicCompactionFrequencySec = 30 * 60; // 30 minutes
/***** --- TLS --- ****/
- // Enable TLS
+ @Deprecated
private boolean tlsEnabled = false;
// Path for the TLS certificate file
private String tlsCertificateFilePath;
@@ -91,6 +91,7 @@
// Accept untrusted TLS certificate from client
private boolean tlsAllowInsecureConnection = false;
private boolean tlsRequireTrustedClientCertOnConnect = false;
+ // TLS for Functions -> Broker
private boolean useTls = false;
private boolean tlsHostnameVerificationEnable = false;
// Enforce authentication
@@ -104,6 +105,11 @@
private Properties properties = new Properties();
+ public boolean getTlsEnabled() {
+ return tlsEnabled || workerPortTls != null;
+ }
+
+
@Data
@Setter
@Getter
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 2a3e04cf51..0e280377e2 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -115,7 +115,7 @@ private void init() {
handlerCollection.setHandlers(new Handler[] { contexts, new
DefaultHandler(), requestLogHandler });
server.setHandler(handlerCollection);
- if (this.workerConfig.isTlsEnabled()) {
+ if (this.workerConfig.getWorkerPortTls() != null) {
try {
SslContextFactory sslCtxFactory =
SecurityUtility.createSslContextFactory(
this.workerConfig.isTlsAllowInsecureConnection(),
this.workerConfig.getTlsTrustCertsFilePath(),
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
index 4c6d24912f..1d60aa41f0 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
@@ -45,7 +45,7 @@ public void testGetterSetter() {
assertEquals("sample/standalone/functions",
wc.getPulsarFunctionsNamespace());
assertEquals(3, wc.getNumFunctionPackageReplicas());
assertEquals(TEST_NAME + "-worker", wc.getWorkerId());
- assertEquals(1234, wc.getWorkerPort());
+ assertEquals(new Integer(1234), wc.getWorkerPort());
}
@Test
@@ -57,7 +57,7 @@ public void testLoadWorkerConfig() throws Exception {
assertEquals("test-function-metadata-topic",
wc.getFunctionMetadataTopicName());
assertEquals(3, wc.getNumFunctionPackageReplicas());
assertEquals("test-worker", wc.getWorkerId());
- assertEquals(7654, wc.getWorkerPort());
+ assertEquals(new Integer(7654), wc.getWorkerPort());
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 45eba27c9d..717019b8df 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -129,23 +130,23 @@
category = CATEGORY_SERVER,
doc = "The port for serving binary protobuf request"
)
- private int servicePort = 6650;
+ private Integer servicePort = 6650;
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving tls secured binary protobuf request"
)
- private int servicePortTls = 6651;
+ private Integer servicePortTls;
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving http requests"
)
- private int webServicePort = 8080;
+ private Integer webServicePort = 8080;
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving https requests"
)
- private int webServicePortTls = 8443;
+ private Integer webServicePortTls;
@FieldContext(
category = CATEGORY_SERVER,
@@ -225,10 +226,8 @@
private boolean tlsEnabledWithBroker = false;
/***** --- TLS --- ****/
- @FieldContext(
- category = CATEGORY_TLS,
- doc = "Whether TLS is enabled for the proxy"
- )
+
+ @Deprecated
private boolean tlsEnabledInProxy = false;
@FieldContext(
@@ -332,6 +331,22 @@ public Properties getProperties() {
return properties;
}
+ public Optional<Integer> getServicePort() {
+ return Optional.ofNullable(servicePort);
+ }
+
+ public Optional<Integer> getServicePortTls() {
+ return Optional.ofNullable(servicePortTls);
+ }
+
+ public Optional<Integer> getWebServicePort() {
+ return Optional.ofNullable(webServicePort);
+ }
+
+ public Optional<Integer> getWebServicePortTls() {
+ return Optional.ofNullable(webServicePortTls);
+ }
+
public void setProperties(Properties properties) {
this.properties = properties;
@@ -387,4 +402,4 @@ public String toString() {
return String.format("HttpReverseProxyConfig(%s, path=%s,
proxyTo=%s)", name, path, proxyTo);
}
}
-}
+}
\ No newline at end of file
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index eaf5f04fe7..317daa39f3 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -104,8 +104,17 @@ public ProxyService(ProxyConfiguration proxyConfig,
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
- this.serviceUrl = String.format("pulsar://%s:%d/", hostname,
proxyConfig.getServicePort());
- this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname,
proxyConfig.getServicePortTls());
+ if (proxyConfig.getServicePort().isPresent()) {
+ this.serviceUrl = String.format("pulsar://%s:%d/", hostname,
proxyConfig.getServicePort().get());
+ } else {
+ this.serviceUrl = null;
+ }
+
+ if (proxyConfig.getServicePortTls().isPresent()) {
+ this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname,
proxyConfig.getServicePortTls().get());
+ } else {
+ this.serviceUrlTls = null;
+ }
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1,
acceptorThreadFactory);
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads,
workersThreadFactory);
@@ -132,18 +141,21 @@ public void start() throws Exception {
bootstrap.childHandler(new ServiceChannelInitializer(this,
proxyConfig, false));
// Bind and start to accept incoming connections.
- try {
- bootstrap.bind(proxyConfig.getServicePort()).sync();
- } catch (Exception e) {
- throw new IOException("Failed to bind Pulsar Proxy on port " +
proxyConfig.getServicePort(), e);
+ if (proxyConfig.getServicePort().isPresent()) {
+ try {
+ bootstrap.bind(proxyConfig.getServicePort().get()).sync();
+ LOG.info("Started Pulsar Proxy at {}", serviceUrl);
+ } catch (Exception e) {
+ throw new IOException("Failed to bind Pulsar Proxy on port " +
proxyConfig.getServicePort().get(), e);
+ }
}
LOG.info("Started Pulsar Proxy at {}", serviceUrl);
- if (proxyConfig.isTlsEnabledInProxy()) {
+ if (proxyConfig.getServicePortTls().isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new ServiceChannelInitializer(this,
proxyConfig, true));
- tlsBootstrap.bind(proxyConfig.getServicePortTls()).sync();
- LOG.info("Started Pulsar TLS Proxy on port {}",
proxyConfig.getServicePortTls());
+ tlsBootstrap.bind(proxyConfig.getServicePortTls().get()).sync();
+ LOG.info("Started Pulsar TLS Proxy on port {}",
proxyConfig.getServicePortTls().get());
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 07d75744b7..20f6b56a90 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -82,7 +82,6 @@ public WebServer(ProxyConfiguration config,
AuthenticationService authentication
this.webServiceExecutor = new ExecutorThreadPool();
this.webServiceExecutor.setName("pulsar-external-web");
this.server = new Server(webServiceExecutor);
- this.externalServicePort = config.getWebServicePort();
this.authenticationService = authenticationService;
this.config = config;
@@ -91,11 +90,13 @@ public WebServer(ProxyConfiguration config,
AuthenticationService authentication
HttpConfiguration http_config = new HttpConfiguration();
http_config.setOutputBufferSize(config.getHttpOutputBufferSize());
- ServerConnector connector = new ServerConnector(server, 1, 1, new
HttpConnectionFactory(http_config));
- connector.setPort(externalServicePort);
- connectors.add(connector);
-
- if (config.isTlsEnabledInProxy()) {
+ if (config.getWebServicePort().isPresent()) {
+ this.externalServicePort = config.getWebServicePort().get();
+ ServerConnector connector = new ServerConnector(server, 1, 1, new
HttpConnectionFactory(http_config));
+ connector.setPort(externalServicePort);
+ connectors.add(connector);
+ }
+ if (config.getWebServicePortTls().isPresent()) {
try {
SslContextFactory sslCtxFactory =
SecurityUtility.createSslContextFactory(
config.isTlsAllowInsecureConnection(),
@@ -104,7 +105,7 @@ public WebServer(ProxyConfiguration config,
AuthenticationService authentication
config.getTlsKeyFilePath(),
config.isTlsRequireTrustedClientCertOnConnect());
ServerConnector tlsConnector = new ServerConnector(server, 1,
1, sslCtxFactory);
- tlsConnector.setPort(config.getWebServicePortTls());
+ tlsConnector.setPort(config.getWebServicePortTls().get());
connectors.add(tlsConnector);
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 7c638f4d83..096ff051b5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -62,7 +62,6 @@ protected void setup() throws Exception {
// enable tls and auth&auth at broker
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert"));
conf.setTlsCertificateFilePath(getTlsFile("broker.cert"));
conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8"));
@@ -80,7 +79,6 @@ protected void setup() throws Exception {
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
@@ -130,7 +128,7 @@ PulsarAdmin getDirectToBrokerAdminClient(String user)
throws Exception {
PulsarAdmin getAdminClient(String user) throws Exception {
return PulsarAdmin.builder()
- .serviceHttpUrl("https://localhost:" +
proxyConfig.getWebServicePortTls())
+ .serviceHttpUrl("https://localhost:" +
proxyConfig.getWebServicePortTls().get())
.tlsTrustCertsFilePath(getTlsFile("ca.cert"))
.allowTlsInsecureConnection(false)
.authentication(AuthenticationTls.class.getName(),
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index c31773fcb0..50cd920e6e 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -74,7 +74,6 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
@@ -105,7 +104,6 @@ protected void setup() throws Exception {
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
@@ -154,7 +152,7 @@ protected void cleanup() throws Exception {
public void testTlsSyncProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls().get();
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index cd3862877e..246078a21c 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -174,7 +174,6 @@ protected void setup() throws Exception {
servicePort = PortManager.nextFreePort();
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(false);
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
// Expires after an hour
conf.setBrokerClientAuthenticationParameters(
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 20ebeb1187..4813615a4b 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -70,12 +70,12 @@ protected void cleanup() throws Exception {
@Test
public void testInboundConnection() throws Exception {
LOG.info("Creating producer 1");
- PulsarClient client1 =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
.build();
Producer<byte[]> producer1 =
client1.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic-1").create();
LOG.info("Creating producer 2");
- PulsarClient client2 =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ PulsarClient client2 =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
.build();
Producer<byte[]> producer2;
Assert.assertEquals(ProxyService.rejectedConnections.get(), 0.0d);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 42915ec2ce..4997de0165 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -54,7 +54,6 @@ protected void setup() throws Exception {
servicePort = PortManager.nextFreePort();
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(false);
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
conf.setBrokerClientAuthenticationParameters("authParam:broker");
conf.setAuthenticateOriginalAuthData(true);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 35dfb94374..9c8ea850c3 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -70,7 +70,7 @@ protected void cleanup() throws Exception {
@Test
public void testLookup() throws Exception {
- PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
.connectionsPerBroker(5).ioThreads(5).build();
assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
assertTrue(proxyService.getLookupRequestSemaphore().tryAcquire());
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 69f4f5a583..7d93ca6d0d 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -152,7 +152,6 @@ protected void setup() throws Exception {
servicePort = PortManager.nextFreePort();
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(false);
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
conf.setBrokerClientAuthenticationParameters("authParam:broker");
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 09fdbdb159..be47d9ea09 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -91,7 +91,7 @@ protected void cleanup() throws Exception {
@Test
public void testProducer() throws Exception {
- PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
.build();
Producer<byte[]> producer =
client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
.create();
@@ -105,7 +105,7 @@ public void testProducer() throws Exception {
@Test
public void testProducerConsumer() throws Exception {
- PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
.build();
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://sample/test/local/producer-consumer-topic")
@@ -137,7 +137,7 @@ public void testProducerConsumer() throws Exception {
@Test
public void testPartitions() throws Exception {
admin.tenants().createTenant("sample", new TenantInfo());
- PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
.build();
admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic",
2);
@@ -164,7 +164,7 @@ public void testPartitions() throws Exception {
@Test
public void testRegexSubscription() throws Exception {
- PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort().get())
.connectionsPerBroker(5).ioThreads(5).build();
// create two topics by subscribing to a topic and closing it
@@ -207,7 +207,7 @@ public void testRegexSubscription() throws Exception {
@Test
private void testProtocolVersionAdvertisement() throws Exception {
- final String url = "pulsar://localhost:" +
proxyConfig.getServicePort();
+ final String url = "pulsar://localhost:" +
proxyConfig.getServicePort().get();
final String topic =
"persistent://sample/test/local/protocol-version-advertisement";
final String sub = "my-sub";
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index db5070fd32..2bf544d92c 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -58,7 +58,6 @@ protected void setup() throws Exception {
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(false);
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
@@ -83,7 +82,7 @@ protected void cleanup() throws Exception {
@Test
public void testProducer() throws Exception {
PulsarClient client = PulsarClient.builder()
- .serviceUrl("pulsar+ssl://localhost:" +
proxyConfig.getServicePortTls()).enableTls(true)
+ .serviceUrl("pulsar+ssl://localhost:" +
proxyConfig.getServicePortTls().get()).enableTls(true)
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build();
Producer<byte[]> producer =
client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/topic").create();
@@ -97,7 +96,7 @@ public void testProducer() throws Exception {
@Test
public void testPartitions() throws Exception {
PulsarClient client = PulsarClient.builder()
- .serviceUrl("pulsar+ssl://localhost:" +
proxyConfig.getServicePortTls()).enableTls(true)
+ .serviceUrl("pulsar+ssl://localhost:" +
proxyConfig.getServicePortTls().get()).enableTls(true)
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).build();
admin.tenants().createTenant("sample", new TenantInfo());
admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic",
2);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 5ee0dac24e..1fb9cb4a7a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -78,7 +78,6 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH);
@@ -110,7 +109,6 @@ protected void setup() throws Exception {
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
@@ -158,7 +156,7 @@ public void testProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
createAdminClient();
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls().get();
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index d894b0833f..1f743e29c2 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -142,7 +142,6 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_BROKER_KEY_FILE_PATH);
@@ -174,7 +173,6 @@ protected void setup() throws Exception {
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
@@ -223,7 +221,7 @@ public void testProxyAuthorization() throws Exception {
startProxy();
createAdminClient();
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls().get();
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
PulsarClient.builder());
@@ -274,7 +272,7 @@ public void testTlsHostVerificationProxyToClient(boolean
hostnameVerificationEna
startProxy();
createAdminClient();
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls().get();
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
PulsarClient.builder().enableTlsHostnameVerification(hostnameVerificationEnabled));
@@ -324,7 +322,7 @@ public void testTlsHostVerificationProxyToBroker(boolean
hostnameVerificationEna
proxyConfig.setTlsHostnameVerificationEnabled(hostnameVerificationEnabled);
startProxy();
createAdminClient();
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls().get();
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
PulsarClient.builder().operationTimeout(1, TimeUnit.SECONDS));
@@ -385,7 +383,6 @@ public void tlsCiphersAndProtocols(Set<String> tlsCiphers,
Set<String> tlsProtoc
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
@@ -418,7 +415,7 @@ public void tlsCiphersAndProtocols(Set<String> tlsCiphers,
Set<String> tlsProtoc
}, 3, 1000);
try {
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls().get();
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
PulsarClient.builder());
Consumer<byte[]> consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 4909a89d65..825982c3df 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -70,7 +70,6 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(false);
- conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
@@ -102,7 +101,6 @@ protected void setup() throws Exception {
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
@@ -149,7 +147,7 @@ protected void cleanup() throws Exception {
public void testDiscoveryService() throws Exception {
log.info("-- Starting {} test --", methodName);
- final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls();
+ final String proxyServiceUrl = "pulsar://localhost:" +
proxyConfig.getServicePortTls().get();
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index 9d5317c3d6..4bca6d4ca1 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -64,7 +64,6 @@ protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(getTlsFile("ca.cert"));
conf.setTlsCertificateFilePath(getTlsFile("broker.cert"));
conf.setTlsKeyFilePath(getTlsFile("broker.key-pk8"));
@@ -82,7 +81,6 @@ protected void setup() throws Exception {
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
- proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
@@ -121,7 +119,7 @@ protected void cleanup() throws Exception {
PulsarAdmin getAdminClient(String user) throws Exception {
return PulsarAdmin.builder()
- .serviceHttpUrl("https://localhost:" +
proxyConfig.getWebServicePortTls())
+ .serviceHttpUrl("https://localhost:" +
proxyConfig.getWebServicePortTls().get())
.tlsTrustCertsFilePath(getTlsFile("ca.cert"))
.allowTlsInsecureConnection(false)
.authentication(AuthenticationTls.class.getName(),
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 5085d14552..492300538a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.HashSet;
@@ -26,7 +25,6 @@
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
-import javax.validation.constraints.AssertTrue;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
@@ -102,7 +100,7 @@ protected void cleanup() throws Exception {
@Test
public void testUnauthenticatedProxy() throws Exception {
PulsarAdmin admin = PulsarAdmin.builder()
- .serviceHttpUrl("http://127.0.0.1:" +
proxyConfig.getWebServicePort())
+ .serviceHttpUrl("http://127.0.0.1:" +
proxyConfig.getWebServicePort().get())
.build();
List<String> activeBrokers =
admin.brokers().getActiveBrokers(configClusterName);
Assert.assertEquals(activeBrokers.size(), 1);
@@ -113,7 +111,7 @@ public void testUnauthenticatedProxy() throws Exception {
@Test
public void testVipStatus() throws Exception {
Client client = ClientBuilder.newClient(new
ClientConfig().register(LoggingFeature.class));
- WebTarget webTarget = client.target("http://127.0.0.1:" +
proxyConfig.getWebServicePort())
+ WebTarget webTarget = client.target("http://127.0.0.1:" +
proxyConfig.getWebServicePort().get())
.path("/status.html");
String response = webTarget.request().get(String.class);
Assert.assertEquals(response, "OK");
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 6660cd8303..b984f81687 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -190,12 +190,12 @@ private PulsarClient createClientInstance(ClusterData
clusterData) throws IOExce
config.getBrokerClientAuthenticationParameters());
}
- if (config.isTlsEnabled()) {
- if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
- clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
- } else if (isNotBlank(clusterData.getServiceUrlTls())) {
- clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
- }
+ if (config.isBrokerClientTlsEnabled()) {
+ if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
+
clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls());
+ } else if (isNotBlank(clusterData.getServiceUrlTls())) {
+
clientBuilder.serviceUrl(clusterData.getServiceUrlTls());
+ }
} else if (isNotBlank(clusterData.getBrokerServiceUrl())) {
clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl());
} else {
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index c7fd3b1740..147058170c 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -65,13 +65,13 @@ public ProxyServer(WebSocketProxyConfiguration config)
this.server = new Server(executorService);
List<ServerConnector> connectors = new ArrayList<>();
- ServerConnector connector = new ServerConnector(server);
-
- connector.setPort(config.getWebServicePort());
- connectors.add(connector);
-
+ if (config.getWebServicePort().isPresent()) {
+ ServerConnector connector = new ServerConnector(server);
+ connector.setPort(config.getWebServicePort().get());
+ connectors.add(connector);
+ }
// TLS enabled connector
- if (config.isTlsEnabled()) {
+ if (config.getWebServicePortTls().isPresent()) {
try {
SslContextFactory sslCtxFactory =
SecurityUtility.createSslContextFactory(
config.isTlsAllowInsecureConnection(),
@@ -80,12 +80,11 @@ public ProxyServer(WebSocketProxyConfiguration config)
config.getTlsKeyFilePath(),
config.getTlsRequireTrustedClientCertOnConnect());
ServerConnector tlsConnector = new ServerConnector(server, -1,
-1, sslCtxFactory);
- tlsConnector.setPort(config.getWebServicePortTls());
+ tlsConnector.setPort(config.getWebServicePortTls().get());
connectors.add(tlsConnector);
} catch (GeneralSecurityException e) {
throw new PulsarServerException(e);
}
-
}
// Limit number of concurrent HTTP connections to avoid getting out of
@@ -117,7 +116,7 @@ public void addRestResources(String basePath, String
javaPackages, String attrib
}
public void start() throws PulsarServerException {
- log.info("Starting web socket proxy at port {}",
conf.getWebServicePort());
+ log.info("Starting web socket proxy at port {}",
conf.getWebServicePort().get());
try {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index d89a9a0f07..7f714ae7ac 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.websocket.service;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -58,9 +59,9 @@
private long zooKeeperSessionTimeoutMillis = 30000;
// Port to use to server HTTP request
- private int webServicePort = 8080;
+ private Integer webServicePort = 8080;
// Port to use to server HTTPS request
- private int webServicePortTls = 8443;
+ private Integer webServicePortTls;
// Hostname or IP address the service binds on, default is 0.0.0.0.
private String bindAddress;
// --- Authentication ---
@@ -99,8 +100,10 @@
private String anonymousUserRole = null;
/***** --- TLS --- ****/
- // Enable TLS
+ @Deprecated
private boolean tlsEnabled = false;
+
+ private boolean brokerClientTlsEnabled = false;
// Path for the TLS certificate file
private String tlsCertificateFilePath;
// Path for the TLS private key file
@@ -189,16 +192,16 @@ public void setZooKeeperSessionTimeoutMillis(long
zooKeeperSessionTimeoutMillis)
this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis;
}
- public int getWebServicePort() {
- return webServicePort;
+ public Optional<Integer> getWebServicePort() {
+ return Optional.ofNullable(webServicePort);
}
public void setWebServicePort(int webServicePort) {
this.webServicePort = webServicePort;
}
- public int getWebServicePortTls() {
- return webServicePortTls;
+ public Optional<Integer> getWebServicePortTls() {
+ return Optional.ofNullable(webServicePortTls);
}
public void setWebServicePortTls(int webServicePortTls) {
@@ -317,12 +320,12 @@ public void setAnonymousUserRole(String
anonymousUserRole) {
this.anonymousUserRole = anonymousUserRole;
}
- public boolean isTlsEnabled() {
- return tlsEnabled;
+ public boolean isBrokerClientTlsEnabled() {
+ return brokerClientTlsEnabled || tlsEnabled;
}
- public void setTlsEnabled(boolean tlsEnabled) {
- this.tlsEnabled = tlsEnabled;
+ public void setBrokerClientTlsEnabled(boolean brokerClientTlsEnabled) {
+ this.brokerClientTlsEnabled = brokerClientTlsEnabled;
}
public String getTlsCertificateFilePath() {
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
index a6f3845c9b..888ad964d0 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java
@@ -45,7 +45,6 @@ public void httpLookupTest() throws Exception{
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
lookupField.setAccessible(true);
Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
"org.apache.pulsar.client.impl.HttpLookupService");
- Assert.assertFalse(testClient.getConfiguration().isUseTls());
service.close();
}
@@ -55,7 +54,7 @@ public void httpsLookupTest() throws Exception{
conf.setServiceUrl("http://localhost:8080");
conf.setServiceUrlTls("https://localhost:8443");
conf.setBrokerServiceUrl("pulsar://localhost:6650");
- conf.setTlsEnabled(true);
+ conf.setBrokerClientTlsEnabled(true);
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl)
service.getPulsarClient();
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
@@ -77,7 +76,6 @@ public void binaryLookupTest() throws Exception{
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
lookupField.setAccessible(true);
Assert.assertEquals(lookupField.get(testClient).getClass().getName(),
"org.apache.pulsar.client.impl.BinaryProtoLookupService");
- Assert.assertFalse(testClient.getConfiguration().isUseTls());
service.close();
}
@@ -88,7 +86,7 @@ public void binaryTlsLookupTest() throws Exception{
conf.setServiceUrlTls("https://localhost:8443");
conf.setBrokerServiceUrl("pulsar://localhost:6650");
conf.setBrokerServiceUrlTls("pulsar+ssl://localhost:6651");
- conf.setTlsEnabled(true);
+ conf.setBrokerClientTlsEnabled(true);
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl)
service.getPulsarClient();
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services