This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 90980da2ea4 [fix][broker][branch-2.10] Fix geo-replication admin
(#19608)
90980da2ea4 is described below
commit 90980da2ea4a24e78f018eb5239839d9bd72c444
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Feb 24 10:09:30 2023 +0800
[fix][broker][branch-2.10] Fix geo-replication admin (#19608)
Signed-off-by: Zixuan Liu <[email protected]>
---
pulsar-broker/pom.xml | 22 ++++++
.../pulsar/broker/service/BrokerService.java | 70 ++++++++++++-----
...torTlsTest.java => ReplicatorAdminTlsTest.java} | 39 +++++-----
.../ReplicatorAdminTlsWithKeyStoreTest.java | 72 +++++++++++++++++
.../pulsar/broker/service/ReplicatorTestBase.java | 86 +++++++++++++++++----
.../pulsar/broker/service/ReplicatorTlsTest.java | 18 +++--
.../certificate-authority/jks/broker.keystore.jks | Bin 0 -> 2254 bytes
.../jks/broker.truststore.jks | Bin 0 -> 969 bytes
.../certificate-authority/jks/client.keystore.jks | Bin 0 -> 2257 bytes
.../jks/client.truststore.jks | Bin 0 -> 971 bytes
10 files changed, 248 insertions(+), 59 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 70147d3b597..dc1de6c98b0 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -581,6 +581,28 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+
<outputDirectory>${project.build.testOutputDirectory}/certificate-authority</outputDirectory>
+ <overwrite>true</overwrite>
+ <resources>
+ <resource>
+
<directory>${project.parent.basedir}/tests/certificate-authority</directory>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<resources>
<resource>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0ea95995f4b..ecc1312ac8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1271,6 +1271,23 @@ public class BrokerService implements Closeable {
}
}
+ private void configAdminTlsSettings(PulsarAdminBuilder adminBuilder,
boolean brokerClientTlsEnabledWithKeyStore,
+ boolean isTlsAllowInsecureConnection,
+ String brokerClientTlsTrustStoreType,
+ String brokerClientTlsTrustStore,
+ String
brokerClientTlsTrustStorePassword,
+ String brokerClientTrustCertsFilePath)
{
+ if (brokerClientTlsEnabledWithKeyStore) {
+ adminBuilder.useKeyStoreTls(true)
+ .tlsTrustStoreType(brokerClientTlsTrustStoreType)
+ .tlsTrustStorePath(brokerClientTlsTrustStore)
+ .tlsTrustStorePassword(brokerClientTlsTrustStorePassword);
+ } else {
+ adminBuilder.tlsTrustCertsFilePath(brokerClientTrustCertsFilePath);
+ }
+ adminBuilder.allowTlsInsecureConnection(isTlsAllowInsecureConnection);
+ }
+
public PulsarAdmin getClusterPulsarAdmin(String cluster,
Optional<ClusterData> clusterDataOp) {
PulsarAdmin admin = clusterAdmins.get(cluster);
if (admin != null) {
@@ -1280,32 +1297,49 @@ public class BrokerService implements Closeable {
try {
ClusterData data = clusterDataOp
.orElseThrow(() -> new
MetadataStoreException.NotFoundException(cluster));
+ PulsarAdminBuilder builder = PulsarAdmin.builder();
ServiceConfiguration conf = pulsar.getConfig();
- boolean isTlsUrl = conf.isBrokerClientTlsEnabled() &&
isNotBlank(data.getServiceUrlTls());
- String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() :
data.getServiceUrl();
- PulsarAdminBuilder builder =
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
-
// Apply all arbitrary configuration. This must be called
before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of
the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more
information.
builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(),
"brokerClient_"));
- builder.authentication(
- conf.getBrokerClientAuthenticationPlugin(),
- conf.getBrokerClientAuthenticationParameters());
-
- if (isTlsUrl) {
-
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
- if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
- builder.useKeyStoreTls(true)
-
.tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
-
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
-
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
- } else {
-
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
- }
+ if (data.getAuthenticationPlugin() != null &&
data.getAuthenticationParameters() != null) {
+ builder.authentication(data.getAuthenticationPlugin(),
data.getAuthenticationParameters());
+ } else {
+
builder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
+
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
+ }
+
+ boolean isTlsEnabled = data.isBrokerClientTlsEnabled() ||
conf.isBrokerClientTlsEnabled();
+ if (isTlsEnabled &&
StringUtils.isEmpty(data.getServiceUrlTls())) {
+ throw new IllegalArgumentException("serviceUrlTls is
empty, brokerClientTlsEnabled: "
+ + isTlsEnabled);
+ } else if (StringUtils.isEmpty(data.getServiceUrl())) {
+ throw new IllegalArgumentException("serviceUrl is empty,
brokerClientTlsEnabled: " + isTlsEnabled);
+ }
+ String adminApiUrl = isTlsEnabled ? data.getServiceUrlTls() :
data.getServiceUrl();
+ builder.serviceHttpUrl(adminApiUrl);
+ if (data.isBrokerClientTlsEnabled()) {
+ configAdminTlsSettings(builder,
+ data.isBrokerClientTlsEnabledWithKeyStore(),
+ data.isTlsAllowInsecureConnection(),
+ data.getBrokerClientTlsTrustStoreType(),
+ data.getBrokerClientTlsTrustStore(),
+ data.getBrokerClientTlsTrustStorePassword(),
+ data.getBrokerClientTrustCertsFilePath()
+ );
+ } else if (conf.isBrokerClientTlsEnabled()) {
+ configAdminTlsSettings(builder,
+ conf.isBrokerClientTlsEnabledWithKeyStore(),
+ conf.isTlsAllowInsecureConnection(),
+ conf.getBrokerClientTlsTrustStoreType(),
+ conf.getBrokerClientTlsTrustStore(),
+ conf.getBrokerClientTlsTrustStorePassword(),
+ conf.getBrokerClientTrustCertsFilePath()
+ );
}
// most of the admin request requires to make zk-call so, keep
the max read-timeout based on
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
similarity index 52%
copy from
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
copy to
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
index 7c28f183c99..373b4794c86 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
@@ -18,26 +18,25 @@
*/
package org.apache.pulsar.broker.service;
+import com.google.common.collect.Lists;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import org.testng.collections.Lists;
@Test(groups = "broker")
-public class ReplicatorTlsTest extends ReplicatorTestBase {
+public class ReplicatorAdminTlsTest extends ReplicatorTestBase {
@Override
@BeforeClass(timeOut = 300000)
public void setup() throws Exception {
- config1.setBrokerClientTlsEnabled(true);
- config2.setBrokerClientTlsEnabled(true);
- config3.setBrokerClientTlsEnabled(true);
super.setup();
}
@@ -48,17 +47,21 @@ public class ReplicatorTlsTest extends ReplicatorTestBase {
}
@Test
- public void testReplicationClient() throws Exception {
- log.info("--- Starting ReplicatorTlsTest::testReplicationClient ---");
+ public void testReplicationAdmin() throws Exception {
for (BrokerService ns : Lists.newArrayList(ns1, ns2, ns3)) {
- ns.getReplicationClients().forEach((cluster, client) -> {
- assertTrue(((PulsarClientImpl)
client).getConfiguration().isUseTls());
- assertEquals(((PulsarClientImpl)
client).getConfiguration().getTlsTrustCertsFilePath(),
- TLS_SERVER_CERT_FILE_PATH);
+ // load the admin
+ ns.getClusterPulsarAdmin(cluster1,
Optional.of(admin1.clusters().getCluster(cluster1)));
+ ns.getClusterPulsarAdmin(cluster2,
Optional.of(admin1.clusters().getCluster(cluster2)));
+ ns.getClusterPulsarAdmin(cluster3,
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+ // verify the admin
+ ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins =
ns.getClusterAdmins();
+ assertFalse(clusterAdmins.isEmpty());
+ clusterAdmins.forEach((cluster, admin) -> {
+ ClientConfigurationData clientConfigData = ((PulsarAdminImpl)
admin).getClientConfigData();
+ assertEquals(clientConfigData.getTlsTrustCertsFilePath(),
caCertFilePath);
+ assertTrue(clientConfigData.isUseTls());
});
}
}
-
- private static final Logger log =
LoggerFactory.getLogger(ReplicatorTlsTest.class);
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
new file mode 100644
index 00000000000..9a7effd81ce
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Lists;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ReplicatorAdminTlsWithKeyStoreTest extends ReplicatorTestBase {
+
+ @Override
+ @BeforeClass(timeOut = 300000)
+ public void setup() throws Exception {
+ tlsWithKeyStore = true;
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void testReplicationAdmin() throws Exception {
+ for (BrokerService ns : Lists.newArrayList(ns1, ns2, ns3)) {
+ // load the admin
+ ns.getClusterPulsarAdmin(cluster1,
Optional.of(admin1.clusters().getCluster(cluster1)));
+ ns.getClusterPulsarAdmin(cluster2,
Optional.of(admin1.clusters().getCluster(cluster2)));
+ ns.getClusterPulsarAdmin(cluster3,
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+ // verify the admin
+ ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins =
ns.getClusterAdmins();
+ assertFalse(clusterAdmins.isEmpty());
+ clusterAdmins.forEach((cluster, admin) -> {
+ ClientConfigurationData clientConfigData = ((PulsarAdminImpl)
admin).getClientConfigData();
+ assertEquals(clientConfigData.getTlsTrustStorePath(),
clientTrustStorePath);
+ assertEquals(clientConfigData.getTlsTrustStorePassword(),
keyStorePassword);
+ assertEquals(clientConfigData.getTlsTrustStoreType(),
keyStoreType);
+ assertTrue(clientConfigData.isUseKeyStoreTls());
+ assertTrue(clientConfigData.isUseTls());
+ });
+ }
+ }
+}
+
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index de246e53334..4270ec448b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import com.google.common.io.Resources;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -86,8 +87,34 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
- protected static final String TLS_SERVER_CERT_FILE_PATH =
"./src/test/resources/certificate/server.crt";
- protected static final String TLS_SERVER_KEY_FILE_PATH =
"./src/test/resources/certificate/server.key";
+ // PEM
+ protected final String brokerCertFilePath =
+
Resources.getResource("certificate-authority/server-keys/broker.cert.pem").getPath();
+ protected final String brokerFilePath =
+
Resources.getResource("certificate-authority/server-keys/broker.key-pk8.pem").getPath();
+ protected final String clientCertFilePath =
+
Resources.getResource("certificate-authority/client-keys/admin.cert.pem").getPath();
+ protected final String clientKeyFilePath =
+
Resources.getResource("certificate-authority/client-keys/admin.key-pk8.pem").getPath();
+ protected final String caCertFilePath =
+
Resources.getResource("certificate-authority/certs/ca.cert.pem").getPath();
+
+ // KEYSTORE
+ protected boolean tlsWithKeyStore = false;
+ protected final static String brokerKeyStorePath =
+
Resources.getResource("certificate-authority/jks/broker.keystore.jks").getPath();
+ protected final static String brokerTrustStorePath =
+
Resources.getResource("certificate-authority/jks/broker.truststore.jks").getPath();
+ protected final static String clientKeyStorePath =
+
Resources.getResource("certificate-authority/jks/client.keystore.jks").getPath();
+ protected final static String clientTrustStorePath =
+
Resources.getResource("certificate-authority/jks/client.truststore.jks").getPath();
+ protected final static String keyStoreType = "JKS";
+ protected final static String keyStorePassword = "111111";
+
+ protected final String cluster1 = "r1";
+ protected final String cluster2 = "r2";
+ protected final String cluster3 = "r3";
// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
@@ -156,23 +183,41 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
// Provision the global namespace
- admin1.clusters().createCluster("r1", ClusterData.builder()
+ admin1.clusters().createCluster(cluster1, ClusterData.builder()
.serviceUrl(url1.toString())
.serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(true)
+ .brokerClientTrustCertsFilePath(caCertFilePath)
+ .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+ .brokerClientTlsTrustStore(clientTrustStorePath)
+ .brokerClientTlsTrustStorePassword(keyStorePassword)
+ .brokerClientTlsTrustStoreType(keyStoreType)
.build());
- admin1.clusters().createCluster("r2", ClusterData.builder()
+ admin1.clusters().createCluster(cluster2, ClusterData.builder()
.serviceUrl(url2.toString())
.serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(true)
+ .brokerClientTrustCertsFilePath(caCertFilePath)
+ .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+ .brokerClientTlsTrustStore(clientTrustStorePath)
+ .brokerClientTlsTrustStorePassword(keyStorePassword)
+ .brokerClientTlsTrustStoreType(keyStoreType)
.build());
- admin1.clusters().createCluster("r3", ClusterData.builder()
+ admin1.clusters().createCluster(cluster3, ClusterData.builder()
.serviceUrl(url3.toString())
.serviceUrlTls(urlTls3.toString())
.brokerServiceUrl(pulsar3.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(true)
+ .brokerClientTrustCertsFilePath(caCertFilePath)
+ .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+ .brokerClientTlsTrustStore(clientTrustStorePath)
+ .brokerClientTlsTrustStorePassword(keyStorePassword)
+ .brokerClientTlsTrustStoreType(keyStoreType)
.build());
admin1.tenants().createTenant("pulsar",
@@ -180,12 +225,12 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1",
"r2", "r3"));
admin1.namespaces().createNamespace("pulsar/ns1",
Sets.newHashSet("r1", "r2"));
- assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(),
url1.toString());
- assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(),
url2.toString());
- assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(),
url3.toString());
- assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(),
pulsar3.getBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster(cluster1).getServiceUrl(),
url1.toString());
+ assertEquals(admin2.clusters().getCluster(cluster2).getServiceUrl(),
url2.toString());
+ assertEquals(admin2.clusters().getCluster(cluster3).getServiceUrl(),
url3.toString());
+
assertEquals(admin2.clusters().getCluster(cluster1).getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrl());
+
assertEquals(admin2.clusters().getCluster(cluster2).getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrl());
+
assertEquals(admin2.clusters().getCluster(cluster3).getBrokerServiceUrl(),
pulsar3.getBrokerServiceUrl());
// Also create V1 namespace for compatibility check
admin1.clusters().createCluster("global", ClusterData.builder()
@@ -193,7 +238,7 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
.serviceUrlTls("https://global:8443")
.build());
admin1.namespaces().createNamespace("pulsar/global/ns");
-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns",
Sets.newHashSet("r1", "r2", "r3"));
+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns",
Sets.newHashSet(cluster1, cluster2, cluster3));
Thread.sleep(100);
log.info("--- ReplicatorTestBase::setup completed ---");
@@ -206,11 +251,11 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
}
public void setConfig1DefaultValue(){
- setConfigDefaults(config1, "r1", bkEnsemble1);
+ setConfigDefaults(config1, cluster1, bkEnsemble1);
}
public void setConfig2DefaultValue() {
- setConfigDefaults(config2, "r2", bkEnsemble2);
+ setConfigDefaults(config2, cluster2, bkEnsemble2);
}
private void setConfigDefaults(ServiceConfiguration config, String
clusterName,
@@ -227,9 +272,16 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
- config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setTlsCertificateFilePath(brokerCertFilePath);
+ config.setTlsKeyFilePath(brokerFilePath);
+ config.setTlsTrustCertsFilePath(caCertFilePath);
+ config.setTlsEnabledWithKeyStore(tlsWithKeyStore);
+ config.setTlsKeyStore(brokerKeyStorePath);
+ config.setTlsKeyStoreType(keyStoreType);
+ config.setTlsKeyStorePassword(keyStorePassword);
+ config.setTlsTrustStore(brokerTrustStorePath);
+ config.setTlsTrustStoreType(keyStoreType);
+ config.setTlsTrustStorePassword(keyStorePassword);
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType("non-partitioned");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
index 7c28f183c99..1be0d664902 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
+import com.google.common.collect.Lists;
+import java.util.Optional;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import org.testng.collections.Lists;
@Test(groups = "broker")
public class ReplicatorTlsTest extends ReplicatorTestBase {
@@ -49,12 +50,17 @@ public class ReplicatorTlsTest extends ReplicatorTestBase {
@Test
public void testReplicationClient() throws Exception {
- log.info("--- Starting ReplicatorTlsTest::testReplicationClient ---");
for (BrokerService ns : Lists.newArrayList(ns1, ns2, ns3)) {
+ // load the client
+ ns.getReplicationClient(cluster1,
Optional.of(admin1.clusters().getCluster(cluster1)));
+ ns.getReplicationClient(cluster2,
Optional.of(admin1.clusters().getCluster(cluster2)));
+ ns.getReplicationClient(cluster3,
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+ // verify the client
ns.getReplicationClients().forEach((cluster, client) -> {
- assertTrue(((PulsarClientImpl)
client).getConfiguration().isUseTls());
- assertEquals(((PulsarClientImpl)
client).getConfiguration().getTlsTrustCertsFilePath(),
- TLS_SERVER_CERT_FILE_PATH);
+ ClientConfigurationData configuration = ((PulsarClientImpl)
client).getConfiguration();
+ assertTrue(configuration.isUseTls());
+ assertEquals(configuration.getTlsTrustCertsFilePath(),
caCertFilePath);
});
}
}
diff --git a/tests/certificate-authority/jks/broker.keystore.jks
b/tests/certificate-authority/jks/broker.keystore.jks
new file mode 100644
index 00000000000..6f2df055f26
Binary files /dev/null and
b/tests/certificate-authority/jks/broker.keystore.jks differ
diff --git a/tests/certificate-authority/jks/broker.truststore.jks
b/tests/certificate-authority/jks/broker.truststore.jks
new file mode 100644
index 00000000000..9c35356c540
Binary files /dev/null and
b/tests/certificate-authority/jks/broker.truststore.jks differ
diff --git a/tests/certificate-authority/jks/client.keystore.jks
b/tests/certificate-authority/jks/client.keystore.jks
new file mode 100644
index 00000000000..0c9d33408e1
Binary files /dev/null and
b/tests/certificate-authority/jks/client.keystore.jks differ
diff --git a/tests/certificate-authority/jks/client.truststore.jks
b/tests/certificate-authority/jks/client.truststore.jks
new file mode 100644
index 00000000000..ac59bd92541
Binary files /dev/null and
b/tests/certificate-authority/jks/client.truststore.jks differ