This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b38556aa19a [fix][broker] Fix geo-replication admin (#19548)
b38556aa19a is described below
commit b38556aa19a1b29accc3b2d64170d169e80ce135
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Feb 22 10:18:13 2023 +0800
[fix][broker] Fix geo-replication admin (#19548)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../pulsar/broker/service/BrokerService.java | 92 +++++++++++++++------
...torTlsTest.java => ReplicatorAdminTlsTest.java} | 38 +++++----
.../ReplicatorAdminTlsWithKeyStoreTest.java | 74 +++++++++++++++++
.../pulsar/broker/service/ReplicatorTestBase.java | 96 ++++++++++++++++++----
.../pulsar/broker/service/ReplicatorTlsTest.java | 16 +++-
5 files changed, 256 insertions(+), 60 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index db7a3f16f97..b9b63427b37 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1370,6 +1370,29 @@ public class BrokerService implements Closeable {
}
}
+ private void configAdminTlsSettings(PulsarAdminBuilder adminBuilder,
boolean brokerClientTlsEnabledWithKeyStore,
+ boolean isTlsAllowInsecureConnection,
+ String brokerClientTlsTrustStoreType,
String brokerClientTlsTrustStore,
+ String
brokerClientTlsTrustStorePassword, String brokerClientTlsKeyStoreType,
+ String brokerClientTlsKeyStore, String
brokerClientTlsKeyStorePassword,
+ String brokerClientTrustCertsFilePath,
+ String brokerClientKeyFilePath, String
brokerClientCertificateFilePath) {
+ if (brokerClientTlsEnabledWithKeyStore) {
+ adminBuilder.useKeyStoreTls(true)
+ .tlsTrustStoreType(brokerClientTlsTrustStoreType)
+ .tlsTrustStorePath(brokerClientTlsTrustStore)
+ .tlsTrustStorePassword(brokerClientTlsTrustStorePassword)
+ .tlsKeyStoreType(brokerClientTlsKeyStoreType)
+ .tlsKeyStorePath(brokerClientTlsKeyStore)
+ .tlsKeyStorePassword(brokerClientTlsKeyStorePassword);
+ } else {
+ adminBuilder.tlsTrustCertsFilePath(brokerClientTrustCertsFilePath)
+ .tlsKeyFilePath(brokerClientKeyFilePath)
+ .tlsCertificateFilePath(brokerClientCertificateFilePath);
+ }
+ adminBuilder.allowTlsInsecureConnection(isTlsAllowInsecureConnection);
+ }
+
public PulsarAdmin getClusterPulsarAdmin(String cluster,
Optional<ClusterData> clusterDataOp) {
PulsarAdmin admin = clusterAdmins.get(cluster);
if (admin != null) {
@@ -1379,37 +1402,58 @@ public class BrokerService implements Closeable {
try {
ClusterData data = clusterDataOp
.orElseThrow(() -> new
MetadataStoreException.NotFoundException(cluster));
+ PulsarAdminBuilder builder = PulsarAdmin.builder();
ServiceConfiguration conf = pulsar.getConfig();
-
- boolean isTlsUrl = conf.isBrokerClientTlsEnabled() &&
isNotBlank(data.getServiceUrlTls());
- String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() :
data.getServiceUrl();
- PulsarAdminBuilder builder =
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
-
// Apply all arbitrary configuration. This must be called
before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of
the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more
information.
builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(),
"brokerClient_"));
- builder.authentication(
- conf.getBrokerClientAuthenticationPlugin(),
- conf.getBrokerClientAuthenticationParameters());
-
- if (isTlsUrl) {
-
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
- if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
- builder.useKeyStoreTls(true)
-
.tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
-
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
-
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
-
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
-
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
-
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
- } else {
-
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
-
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
-
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
- }
+ if (data.getAuthenticationPlugin() != null &&
data.getAuthenticationParameters() != null) {
+ builder.authentication(data.getAuthenticationPlugin(),
data.getAuthenticationParameters());
+ } else {
+
builder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
+
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
+ }
+
+ boolean isTlsEnabled = data.isBrokerClientTlsEnabled() ||
conf.isBrokerClientTlsEnabled();
+ if (isTlsEnabled &&
StringUtils.isEmpty(data.getServiceUrlTls())) {
+ throw new IllegalArgumentException("serviceUrlTls is
empty, brokerClientTlsEnabled: "
+ + isTlsEnabled);
+ } else if (StringUtils.isEmpty(data.getServiceUrl())) {
+ throw new IllegalArgumentException("serviceUrl is empty,
brokerClientTlsEnabled: " + isTlsEnabled);
+ }
+ String adminApiUrl = isTlsEnabled ? data.getServiceUrlTls() :
data.getServiceUrl();
+ builder.serviceHttpUrl(adminApiUrl);
+ if (data.isBrokerClientTlsEnabled()) {
+ configAdminTlsSettings(builder,
+ data.isBrokerClientTlsEnabledWithKeyStore(),
+ data.isTlsAllowInsecureConnection(),
+ data.getBrokerClientTlsTrustStoreType(),
+ data.getBrokerClientTlsTrustStore(),
+ data.getBrokerClientTlsTrustStorePassword(),
+ data.getBrokerClientTlsKeyStoreType(),
+ data.getBrokerClientTlsKeyStore(),
+ data.getBrokerClientTlsKeyStorePassword(),
+ data.getBrokerClientTrustCertsFilePath(),
+ data.getBrokerClientKeyFilePath(),
+ data.getBrokerClientCertificateFilePath()
+ );
+ } else if (conf.isBrokerClientTlsEnabled()) {
+ configAdminTlsSettings(builder,
+ conf.isBrokerClientTlsEnabledWithKeyStore(),
+ conf.isTlsAllowInsecureConnection(),
+ conf.getBrokerClientTlsTrustStoreType(),
+ conf.getBrokerClientTlsTrustStore(),
+ conf.getBrokerClientTlsTrustStorePassword(),
+ conf.getBrokerClientTlsKeyStoreType(),
+ conf.getBrokerClientTlsKeyStore(),
+ conf.getBrokerClientTlsKeyStorePassword(),
+ conf.getBrokerClientTrustCertsFilePath(),
+ conf.getBrokerClientKeyFilePath(),
+ conf.getBrokerClientCertificateFilePath()
+ );
}
// most of the admin request requires to make zk-call so, keep
the max read-timeout based on
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
similarity index 50%
copy from
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
copy to
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
index e1506d16188..a5d14ca0487 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
@@ -19,24 +19,24 @@
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.List;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class ReplicatorTlsTest extends ReplicatorTestBase {
+public class ReplicatorAdminTlsTest extends ReplicatorTestBase {
@Override
@BeforeClass(timeOut = 300000)
public void setup() throws Exception {
- config1.setBrokerClientTlsEnabled(true);
- config2.setBrokerClientTlsEnabled(true);
- config3.setBrokerClientTlsEnabled(true);
super.setup();
}
@@ -47,17 +47,23 @@ public class ReplicatorTlsTest extends ReplicatorTestBase {
}
@Test
- public void testReplicationClient() throws Exception {
- log.info("--- Starting ReplicatorTlsTest::testReplicationClient ---");
+ public void testReplicationAdmin() throws Exception {
for (BrokerService ns : List.of(ns1, ns2, ns3)) {
- ns.getReplicationClients().forEach((cluster, client) -> {
- assertTrue(((PulsarClientImpl)
client).getConfiguration().isUseTls());
- assertEquals(((PulsarClientImpl)
client).getConfiguration().getTlsTrustCertsFilePath(),
- TLS_SERVER_CERT_FILE_PATH);
+ // load the admin
+ ns.getClusterPulsarAdmin(cluster1,
Optional.of(admin1.clusters().getCluster(cluster1)));
+ ns.getClusterPulsarAdmin(cluster2,
Optional.of(admin1.clusters().getCluster(cluster2)));
+ ns.getClusterPulsarAdmin(cluster3,
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+ // verify the admin
+ ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins =
ns.getClusterAdmins();
+ assertFalse(clusterAdmins.isEmpty());
+ clusterAdmins.forEach((cluster, admin) -> {
+ ClientConfigurationData clientConfigData = ((PulsarAdminImpl)
admin).getClientConfigData();
+ assertEquals(clientConfigData.getTlsTrustCertsFilePath(),
caCertFilePath);
+ assertEquals(clientConfigData.getTlsKeyFilePath(),
clientKeyFilePath);
+ assertEquals(clientConfigData.getTlsCertificateFilePath(),
clientCertFilePath);
+ assertTrue(clientConfigData.isUseTls());
});
}
}
-
- private static final Logger log =
LoggerFactory.getLogger(ReplicatorTlsTest.class);
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
new file mode 100644
index 00000000000..3d3eb3faa72
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.Optional;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ReplicatorAdminTlsWithKeyStoreTest extends ReplicatorTestBase {
+
+ @Override
+ @BeforeClass(timeOut = 300000)
+ public void setup() throws Exception {
+ tlsWithKeyStore = true;
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void testReplicationAdmin() throws Exception {
+ for (BrokerService ns : List.of(ns1, ns2, ns3)) {
+ // load the admin
+ ns.getClusterPulsarAdmin(cluster1,
Optional.of(admin1.clusters().getCluster(cluster1)));
+ ns.getClusterPulsarAdmin(cluster2,
Optional.of(admin1.clusters().getCluster(cluster2)));
+ ns.getClusterPulsarAdmin(cluster3,
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+ // verify the admin
+ ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins =
ns.getClusterAdmins();
+ assertFalse(clusterAdmins.isEmpty());
+ clusterAdmins.forEach((cluster, admin) -> {
+ ClientConfigurationData clientConfigData = ((PulsarAdminImpl)
admin).getClientConfigData();
+ assertEquals(clientConfigData.getTlsKeyStorePath(),
clientKeyStorePath);
+ assertEquals(clientConfigData.getTlsKeyStorePassword(),
keyStorePassword);
+ assertEquals(clientConfigData.getTlsKeyStoreType(),
keyStoreType);
+ assertEquals(clientConfigData.getTlsTrustStorePath(),
clientTrustStorePath);
+ assertEquals(clientConfigData.getTlsTrustStorePassword(),
keyStorePassword);
+ assertEquals(clientConfigData.getTlsTrustStoreType(),
keyStoreType);
+ assertTrue(clientConfigData.isUseKeyStoreTls());
+ assertTrue(clientConfigData.isUseTls());
+ });
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index a7752b4a63f..b83e8ac9d2d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import com.google.common.io.Resources;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -87,8 +88,29 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
- protected static final String TLS_SERVER_CERT_FILE_PATH =
"./src/test/resources/certificate/server.crt";
- protected static final String TLS_SERVER_KEY_FILE_PATH =
"./src/test/resources/certificate/server.key";
+ // PEM
+ protected final String brokerCertFilePath =
Resources.getResource("certificate-authority/server-keys/broker.cert.pem").getPath();
+ protected final String brokerFilePath =
Resources.getResource("certificate-authority/server-keys/broker.key-pk8.pem").getPath();
+ protected final String clientCertFilePath =
Resources.getResource("certificate-authority/client-keys/admin.cert.pem").getPath();
+ protected final String clientKeyFilePath =
Resources.getResource("certificate-authority/client-keys/admin.key-pk8.pem").getPath();
+ protected final String caCertFilePath =
Resources.getResource("certificate-authority/certs/ca.cert.pem").getPath();
+
+ // KEYSTORE
+ protected boolean tlsWithKeyStore = false;
+ protected final static String brokerKeyStorePath =
+
Resources.getResource("certificate-authority/jks/broker.keystore.jks").getPath();
+ protected final static String brokerTrustStorePath =
+
Resources.getResource("certificate-authority/jks/broker.truststore.jks").getPath();
+ protected final static String clientKeyStorePath =
+
Resources.getResource("certificate-authority/jks/client.keystore.jks").getPath();
+ protected final static String clientTrustStorePath =
+
Resources.getResource("certificate-authority/jks/client.truststore.jks").getPath();
+ protected final static String keyStoreType = "JKS";
+ protected final static String keyStorePassword = "111111";
+
+ protected final String cluster1 = "r1";
+ protected final String cluster2 = "r2";
+ protected final String cluster3 = "r3";
// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
@@ -157,23 +179,56 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
// Provision the global namespace
- admin1.clusters().createCluster("r1", ClusterData.builder()
+ admin1.clusters().createCluster(cluster1, ClusterData.builder()
.serviceUrl(url1.toString())
.serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(true)
+ .brokerClientCertificateFilePath(clientCertFilePath)
+ .brokerClientKeyFilePath(clientKeyFilePath)
+ .brokerClientTrustCertsFilePath(caCertFilePath)
+ .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+ .brokerClientTlsKeyStore(clientKeyStorePath)
+ .brokerClientTlsKeyStorePassword(keyStorePassword)
+ .brokerClientTlsKeyStoreType(keyStoreType)
+ .brokerClientTlsTrustStore(clientTrustStorePath)
+ .brokerClientTlsTrustStorePassword(keyStorePassword)
+ .brokerClientTlsTrustStoreType(keyStoreType)
.build());
- admin1.clusters().createCluster("r2", ClusterData.builder()
+ admin1.clusters().createCluster(cluster2, ClusterData.builder()
.serviceUrl(url2.toString())
.serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(true)
+ .brokerClientCertificateFilePath(clientCertFilePath)
+ .brokerClientKeyFilePath(clientKeyFilePath)
+ .brokerClientTrustCertsFilePath(caCertFilePath)
+ .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+ .brokerClientTlsKeyStore(clientKeyStorePath)
+ .brokerClientTlsKeyStorePassword(keyStorePassword)
+ .brokerClientTlsKeyStoreType(keyStoreType)
+ .brokerClientTlsTrustStore(clientTrustStorePath)
+ .brokerClientTlsTrustStorePassword(keyStorePassword)
+ .brokerClientTlsTrustStoreType(keyStoreType)
.build());
- admin1.clusters().createCluster("r3", ClusterData.builder()
+ admin1.clusters().createCluster(cluster3, ClusterData.builder()
.serviceUrl(url3.toString())
.serviceUrlTls(urlTls3.toString())
.brokerServiceUrl(pulsar3.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(true)
+ .brokerClientCertificateFilePath(clientCertFilePath)
+ .brokerClientKeyFilePath(clientKeyFilePath)
+ .brokerClientTrustCertsFilePath(caCertFilePath)
+ .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+ .brokerClientTlsKeyStore(clientKeyStorePath)
+ .brokerClientTlsKeyStorePassword(keyStorePassword)
+ .brokerClientTlsKeyStoreType(keyStoreType)
+ .brokerClientTlsTrustStore(clientTrustStorePath)
+ .brokerClientTlsTrustStorePassword(keyStorePassword)
+ .brokerClientTlsTrustStoreType(keyStoreType)
.build());
admin1.tenants().createTenant("pulsar",
@@ -181,12 +236,12 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1",
"r2", "r3"));
admin1.namespaces().createNamespace("pulsar/ns1",
Sets.newHashSet("r1", "r2"));
- assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(),
url1.toString());
- assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(),
url2.toString());
- assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(),
url3.toString());
- assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(),
pulsar3.getBrokerServiceUrl());
+ assertEquals(admin2.clusters().getCluster(cluster1).getServiceUrl(),
url1.toString());
+ assertEquals(admin2.clusters().getCluster(cluster2).getServiceUrl(),
url2.toString());
+ assertEquals(admin2.clusters().getCluster(cluster3).getServiceUrl(),
url3.toString());
+
assertEquals(admin2.clusters().getCluster(cluster1).getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrl());
+
assertEquals(admin2.clusters().getCluster(cluster2).getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrl());
+
assertEquals(admin2.clusters().getCluster(cluster3).getBrokerServiceUrl(),
pulsar3.getBrokerServiceUrl());
// Also create V1 namespace for compatibility check
admin1.clusters().createCluster("global", ClusterData.builder()
@@ -194,7 +249,7 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
.serviceUrlTls("https://global:8443")
.build());
admin1.namespaces().createNamespace("pulsar/global/ns");
-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns",
Sets.newHashSet("r1", "r2", "r3"));
+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns",
Sets.newHashSet(cluster1, cluster2, cluster3));
Thread.sleep(100);
log.info("--- ReplicatorTestBase::setup completed ---");
@@ -207,11 +262,11 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
}
public void setConfig1DefaultValue(){
- setConfigDefaults(config1, "r1", bkEnsemble1);
+ setConfigDefaults(config1, cluster1, bkEnsemble1);
}
public void setConfig2DefaultValue() {
- setConfigDefaults(config2, "r2", bkEnsemble2);
+ setConfigDefaults(config2, cluster2, bkEnsemble2);
}
private void setConfigDefaults(ServiceConfiguration config, String
clusterName,
@@ -229,9 +284,16 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
- config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setTlsCertificateFilePath(brokerCertFilePath);
+ config.setTlsKeyFilePath(brokerFilePath);
+ config.setTlsTrustCertsFilePath(caCertFilePath);
+ config.setTlsEnabledWithKeyStore(tlsWithKeyStore);
+ config.setTlsKeyStore(brokerKeyStorePath);
+ config.setTlsKeyStoreType(keyStoreType);
+ config.setTlsKeyStorePassword(keyStorePassword);
+ config.setTlsTrustStore(brokerTrustStorePath);
+ config.setTlsTrustStoreType(keyStoreType);
+ config.setTlsTrustStorePassword(keyStorePassword);
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
index e1506d16188..49e4e795394 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.List;
+import java.util.Optional;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,10 +52,18 @@ public class ReplicatorTlsTest extends ReplicatorTestBase {
public void testReplicationClient() throws Exception {
log.info("--- Starting ReplicatorTlsTest::testReplicationClient ---");
for (BrokerService ns : List.of(ns1, ns2, ns3)) {
+ // load the client
+ ns.getReplicationClient(cluster1,
Optional.of(admin1.clusters().getCluster(cluster1)));
+ ns.getReplicationClient(cluster2,
Optional.of(admin1.clusters().getCluster(cluster2)));
+ ns.getReplicationClient(cluster3,
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+ // verify the client
ns.getReplicationClients().forEach((cluster, client) -> {
- assertTrue(((PulsarClientImpl)
client).getConfiguration().isUseTls());
- assertEquals(((PulsarClientImpl)
client).getConfiguration().getTlsTrustCertsFilePath(),
- TLS_SERVER_CERT_FILE_PATH);
+ ClientConfigurationData configuration = ((PulsarClientImpl)
client).getConfiguration();
+ assertTrue(configuration.isUseTls());
+ assertEquals(configuration.getTlsTrustCertsFilePath(),
caCertFilePath);
+ assertEquals(configuration.getTlsKeyFilePath(),
clientKeyFilePath);
+ assertEquals(configuration.getTlsCertificateFilePath(),
clientCertFilePath);
});
}
}