This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c3357c2b44f [fix][broker] Fix geo-replication admin (#19686)
c3357c2b44f is described below

commit c3357c2b44fa58504eb971312647182e66fbad4a
Author: Zixuan Liu <[email protected]>
AuthorDate: Sun Mar 5 09:52:00 2023 +0800

    [fix][broker] Fix geo-replication admin (#19686)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../pulsar/broker/service/BrokerService.java       |  97 ++++++++++++++-----
 .../broker/service/ReplicatorAdminTlsTest.java     |  69 ++++++++++++++
 .../ReplicatorAdminTlsWithKeyStoreTest.java        |  74 +++++++++++++++
 .../pulsar/broker/service/ReplicatorTestBase.java  | 104 ++++++++++++++++-----
 .../pulsar/broker/service/ReplicatorTlsTest.java   |  21 +++--
 5 files changed, 311 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 69030b85868..2cee82a97ae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1296,6 +1296,31 @@ public class BrokerService implements Closeable {
         }
     }
 
+    private void configAdminTlsSettings(PulsarAdminBuilder adminBuilder, 
boolean brokerClientTlsEnabledWithKeyStore,
+                                        boolean isTlsAllowInsecureConnection,
+                                        String brokerClientTlsTrustStoreType, 
String brokerClientTlsTrustStore,
+                                        String 
brokerClientTlsTrustStorePassword, String brokerClientTlsKeyStoreType,
+                                        String brokerClientTlsKeyStore, String 
brokerClientTlsKeyStorePassword,
+                                        String brokerClientTrustCertsFilePath,
+                                        String brokerClientKeyFilePath, String 
brokerClientCertificateFilePath,
+                                        boolean 
isTlsHostnameVerificationEnabled) {
+        if (brokerClientTlsEnabledWithKeyStore) {
+            adminBuilder.useKeyStoreTls(true)
+                    .tlsTrustStoreType(brokerClientTlsTrustStoreType)
+                    .tlsTrustStorePath(brokerClientTlsTrustStore)
+                    .tlsTrustStorePassword(brokerClientTlsTrustStorePassword)
+                    .tlsKeyStoreType(brokerClientTlsKeyStoreType)
+                    .tlsKeyStorePath(brokerClientTlsKeyStore)
+                    .tlsKeyStorePassword(brokerClientTlsKeyStorePassword);
+        } else {
+            adminBuilder.tlsTrustCertsFilePath(brokerClientTrustCertsFilePath)
+                    .tlsKeyFilePath(brokerClientKeyFilePath)
+                    .tlsCertificateFilePath(brokerClientCertificateFilePath);
+        }
+        adminBuilder.allowTlsInsecureConnection(isTlsAllowInsecureConnection)
+                
.enableTlsHostnameVerification(isTlsHostnameVerificationEnabled);
+    }
+
     public PulsarAdmin getClusterPulsarAdmin(String cluster, 
Optional<ClusterData> clusterDataOp) {
         PulsarAdmin admin = clusterAdmins.get(cluster);
         if (admin != null) {
@@ -1305,38 +1330,60 @@ public class BrokerService implements Closeable {
             try {
                 ClusterData data = clusterDataOp
                         .orElseThrow(() -> new 
MetadataStoreException.NotFoundException(cluster));
+                PulsarAdminBuilder builder = PulsarAdmin.builder();
 
                 ServiceConfiguration conf = pulsar.getConfig();
-
-                boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && 
isNotBlank(data.getServiceUrlTls());
-                String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : 
data.getServiceUrl();
-                PulsarAdminBuilder builder = 
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
-
                 // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
                 // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
                 // See https://github.com/apache/pulsar/issues/8509 for more 
information.
                 
builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), 
"brokerClient_"));
 
-                builder.authentication(
-                        conf.getBrokerClientAuthenticationPlugin(),
-                        conf.getBrokerClientAuthenticationParameters());
-
-                if (isTlsUrl) {
-                    
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
-                            
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
-                    if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
-                        builder.useKeyStoreTls(true)
-                                
.tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
-                                
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
-                                
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
-                                
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
-                                
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
-                                
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
-                    } else {
-                        
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
-                                
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
-                                
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
-                    }
+                if (data.getAuthenticationPlugin() != null && 
data.getAuthenticationParameters() != null) {
+                    builder.authentication(data.getAuthenticationPlugin(), 
data.getAuthenticationParameters());
+                } else {
+                    
builder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
+                            
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
+                }
+
+                boolean isTlsEnabled = data.isBrokerClientTlsEnabled() || 
conf.isBrokerClientTlsEnabled();
+                if (isTlsEnabled && 
StringUtils.isEmpty(data.getServiceUrlTls())) {
+                    throw new IllegalArgumentException("serviceUrlTls is 
empty, brokerClientTlsEnabled: "
+                            + isTlsEnabled);
+                } else if (StringUtils.isEmpty(data.getServiceUrl())) {
+                    throw new IllegalArgumentException("serviceUrl is empty, 
brokerClientTlsEnabled: " + isTlsEnabled);
+                }
+                String adminApiUrl = isTlsEnabled ? data.getServiceUrlTls() : 
data.getServiceUrl();
+                builder.serviceHttpUrl(adminApiUrl);
+                if (data.isBrokerClientTlsEnabled()) {
+                    configAdminTlsSettings(builder,
+                            data.isBrokerClientTlsEnabledWithKeyStore(),
+                            data.isTlsAllowInsecureConnection(),
+                            data.getBrokerClientTlsTrustStoreType(),
+                            data.getBrokerClientTlsTrustStore(),
+                            data.getBrokerClientTlsTrustStorePassword(),
+                            data.getBrokerClientTlsKeyStoreType(),
+                            data.getBrokerClientTlsKeyStore(),
+                            data.getBrokerClientTlsKeyStorePassword(),
+                            data.getBrokerClientTrustCertsFilePath(),
+                            data.getBrokerClientKeyFilePath(),
+                            data.getBrokerClientCertificateFilePath(),
+                            conf.isTlsHostnameVerificationEnabled()
+                    );
+                } else if (conf.isBrokerClientTlsEnabled()) {
+                    configAdminTlsSettings(builder,
+                            conf.isBrokerClientTlsEnabledWithKeyStore(),
+                            conf.isTlsAllowInsecureConnection(),
+                            conf.getBrokerClientTlsTrustStoreType(),
+                            conf.getBrokerClientTlsTrustStore(),
+                            conf.getBrokerClientTlsTrustStorePassword(),
+                            conf.getBrokerClientTlsKeyStoreType(),
+                            conf.getBrokerClientTlsKeyStore(),
+                            conf.getBrokerClientTlsKeyStorePassword(),
+                            conf.getBrokerClientTrustCertsFilePath(),
+                            conf.getBrokerClientKeyFilePath(),
+                            conf.getBrokerClientCertificateFilePath(),
+                            conf.isTlsHostnameVerificationEnabled()
+                    );
                 }
 
                 // most of the admin request requires to make zk-call so, keep 
the max read-timeout based on
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
new file mode 100644
index 00000000000..51c63c7c40b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.Optional;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ReplicatorAdminTlsTest extends ReplicatorTestBase {
+
+    @Override
+    @BeforeClass(timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Test
+    public void testReplicationAdmin() throws Exception {
+        for (BrokerService ns : List.of(ns1, ns2, ns3)) {
+            // load the admin
+            ns.getClusterPulsarAdmin(cluster1, 
Optional.of(admin1.clusters().getCluster(cluster1)));
+            ns.getClusterPulsarAdmin(cluster2, 
Optional.of(admin1.clusters().getCluster(cluster2)));
+            ns.getClusterPulsarAdmin(cluster3, 
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+            // verify the admin
+            ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins = 
ns.getClusterAdmins();
+            assertFalse(clusterAdmins.isEmpty());
+            clusterAdmins.forEach((cluster, admin) -> {
+                ClientConfigurationData clientConfigData = ((PulsarAdminImpl) 
admin).getClientConfigData();
+                assertEquals(clientConfigData.getTlsTrustCertsFilePath(), 
caCertFilePath);
+                assertEquals(clientConfigData.getTlsKeyFilePath(), 
clientKeyFilePath);
+                assertEquals(clientConfigData.getTlsCertificateFilePath(), 
clientCertFilePath);
+                assertTrue(clientConfigData.isUseTls());
+            });
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
new file mode 100644
index 00000000000..703520475b1
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorAdminTlsWithKeyStoreTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.Optional;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ReplicatorAdminTlsWithKeyStoreTest extends ReplicatorTestBase {
+
+    @Override
+    @BeforeClass(timeOut = 300000)
+    public void setup() throws Exception {
+        tlsWithKeyStore = true;
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Test
+    public void testReplicationAdmin() throws Exception {
+        for (BrokerService ns : List.of(ns1, ns2, ns3)) {
+            // load the admin
+            ns.getClusterPulsarAdmin(cluster1, 
Optional.of(admin1.clusters().getCluster(cluster1)));
+            ns.getClusterPulsarAdmin(cluster2, 
Optional.of(admin1.clusters().getCluster(cluster2)));
+            ns.getClusterPulsarAdmin(cluster3, 
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+            // verify the admin
+            ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins = 
ns.getClusterAdmins();
+            assertFalse(clusterAdmins.isEmpty());
+            clusterAdmins.forEach((cluster, admin) -> {
+                ClientConfigurationData clientConfigData = ((PulsarAdminImpl) 
admin).getClientConfigData();
+                assertEquals(clientConfigData.getTlsKeyStorePath(), 
clientKeyStorePath);
+                assertEquals(clientConfigData.getTlsKeyStorePassword(), 
keyStorePassword);
+                assertEquals(clientConfigData.getTlsKeyStoreType(), 
keyStoreType);
+                assertEquals(clientConfigData.getTlsTrustStorePath(), 
clientTrustStorePath);
+                assertEquals(clientConfigData.getTlsTrustStorePassword(), 
keyStorePassword);
+                assertEquals(clientConfigData.getTlsTrustStoreType(), 
keyStoreType);
+                assertTrue(clientConfigData.isUseKeyStoreTls());
+                assertTrue(clientConfigData.isUseTls());
+            });
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 9768fbf28cd..b6530c78202 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -20,11 +20,9 @@ package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-
 import com.google.common.collect.Sets;
-
+import com.google.common.io.Resources;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.net.URL;
 import java.util.Optional;
 import java.util.Set;
@@ -33,11 +31,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -48,7 +43,9 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.slf4j.Logger;
@@ -86,8 +83,28 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
 
     static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
 
-    protected static final String TLS_SERVER_CERT_FILE_PATH = 
"./src/test/resources/certificate/server.crt";
-    protected static final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/certificate/server.key";
+    protected final String brokerCertFilePath = 
Resources.getResource("certificate-authority/server-keys/broker.cert.pem").getPath();
+    protected final String brokerFilePath = 
Resources.getResource("certificate-authority/server-keys/broker.key-pk8.pem").getPath();
+    protected final String clientCertFilePath = 
Resources.getResource("certificate-authority/client-keys/admin.cert.pem").getPath();
+    protected final String clientKeyFilePath = 
Resources.getResource("certificate-authority/client-keys/admin.key-pk8.pem").getPath();
+    protected final String caCertFilePath = 
Resources.getResource("certificate-authority/certs/ca.cert.pem").getPath();
+
+    // KEYSTORE
+    protected boolean tlsWithKeyStore = false;
+    protected final static String brokerKeyStorePath =
+            
Resources.getResource("certificate-authority/jks/broker.keystore.jks").getPath();
+    protected final static String brokerTrustStorePath =
+            
Resources.getResource("certificate-authority/jks/broker.truststore.jks").getPath();
+    protected final static String clientKeyStorePath =
+            
Resources.getResource("certificate-authority/jks/client.keystore.jks").getPath();
+    protected final static String clientTrustStorePath =
+            
Resources.getResource("certificate-authority/jks/client.truststore.jks").getPath();
+    protected final static String keyStoreType = "JKS";
+    protected final static String keyStorePassword = "111111";
+
+    protected final String cluster1 = "r1";
+    protected final String cluster2 = "r2";
+    protected final String cluster3 = "r3";
 
     // Default frequency
     public int getBrokerServicePurgeInactiveFrequency() {
@@ -156,23 +173,56 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
 
         // Provision the global namespace
-        admin1.clusters().createCluster("r1", ClusterData.builder()
+        admin1.clusters().createCluster(cluster1, ClusterData.builder()
                 .serviceUrl(url1.toString())
                 .serviceUrlTls(urlTls1.toString())
                 .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
                 .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(true)
+                .brokerClientCertificateFilePath(clientCertFilePath)
+                .brokerClientKeyFilePath(clientKeyFilePath)
+                .brokerClientTrustCertsFilePath(caCertFilePath)
+                .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+                .brokerClientTlsKeyStore(clientKeyStorePath)
+                .brokerClientTlsKeyStorePassword(keyStorePassword)
+                .brokerClientTlsKeyStoreType(keyStoreType)
+                .brokerClientTlsTrustStore(clientTrustStorePath)
+                .brokerClientTlsTrustStorePassword(keyStorePassword)
+                .brokerClientTlsTrustStoreType(keyStoreType)
                 .build());
-        admin1.clusters().createCluster("r2", ClusterData.builder()
+        admin1.clusters().createCluster(cluster2, ClusterData.builder()
                 .serviceUrl(url2.toString())
                 .serviceUrlTls(urlTls2.toString())
                 .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
                 .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(true)
+                .brokerClientCertificateFilePath(clientCertFilePath)
+                .brokerClientKeyFilePath(clientKeyFilePath)
+                .brokerClientTrustCertsFilePath(caCertFilePath)
+                .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+                .brokerClientTlsKeyStore(clientKeyStorePath)
+                .brokerClientTlsKeyStorePassword(keyStorePassword)
+                .brokerClientTlsKeyStoreType(keyStoreType)
+                .brokerClientTlsTrustStore(clientTrustStorePath)
+                .brokerClientTlsTrustStorePassword(keyStorePassword)
+                .brokerClientTlsTrustStoreType(keyStoreType)
                 .build());
-        admin1.clusters().createCluster("r3", ClusterData.builder()
+        admin1.clusters().createCluster(cluster3, ClusterData.builder()
                 .serviceUrl(url3.toString())
                 .serviceUrlTls(urlTls3.toString())
                 .brokerServiceUrl(pulsar3.getBrokerServiceUrl())
                 .brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(true)
+                .brokerClientCertificateFilePath(clientCertFilePath)
+                .brokerClientKeyFilePath(clientKeyFilePath)
+                .brokerClientTrustCertsFilePath(caCertFilePath)
+                .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
+                .brokerClientTlsKeyStore(clientKeyStorePath)
+                .brokerClientTlsKeyStorePassword(keyStorePassword)
+                .brokerClientTlsKeyStoreType(keyStoreType)
+                .brokerClientTlsTrustStore(clientTrustStorePath)
+                .brokerClientTlsTrustStorePassword(keyStorePassword)
+                .brokerClientTlsTrustStoreType(keyStoreType)
                 .build());
 
         admin1.tenants().createTenant("pulsar",
@@ -180,12 +230,12 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", 
"r2", "r3"));
         admin1.namespaces().createNamespace("pulsar/ns1", 
Sets.newHashSet("r1", "r2"));
 
-        assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), 
url1.toString());
-        assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), 
url2.toString());
-        assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), 
url3.toString());
-        assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), 
pulsar1.getBrokerServiceUrl());
-        assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), 
pulsar2.getBrokerServiceUrl());
-        assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), 
pulsar3.getBrokerServiceUrl());
+        assertEquals(admin2.clusters().getCluster(cluster1).getServiceUrl(), 
url1.toString());
+        assertEquals(admin2.clusters().getCluster(cluster2).getServiceUrl(), 
url2.toString());
+        assertEquals(admin2.clusters().getCluster(cluster3).getServiceUrl(), 
url3.toString());
+        
assertEquals(admin2.clusters().getCluster(cluster1).getBrokerServiceUrl(), 
pulsar1.getBrokerServiceUrl());
+        
assertEquals(admin2.clusters().getCluster(cluster2).getBrokerServiceUrl(), 
pulsar2.getBrokerServiceUrl());
+        
assertEquals(admin2.clusters().getCluster(cluster3).getBrokerServiceUrl(), 
pulsar3.getBrokerServiceUrl());
 
         // Also create V1 namespace for compatibility check
         admin1.clusters().createCluster("global", ClusterData.builder()
@@ -193,7 +243,8 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
                 .serviceUrlTls("https://global:8443";)
                 .build());
         admin1.namespaces().createNamespace("pulsar/global/ns");
-        
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", 
Sets.newHashSet("r1", "r2", "r3"));
+        admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns",
+                Sets.newHashSet(cluster1, cluster2, cluster3));
 
         Thread.sleep(100);
         log.info("--- ReplicatorTestBase::setup completed ---");
@@ -206,11 +257,11 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
     }
 
     public void setConfig1DefaultValue(){
-        setConfigDefaults(config1, "r1", bkEnsemble1);
+        setConfigDefaults(config1, cluster1, bkEnsemble1);
     }
 
     public void setConfig2DefaultValue() {
-        setConfigDefaults(config2, "r2", bkEnsemble2);
+        setConfigDefaults(config2, cluster2, bkEnsemble2);
     }
 
     private void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
@@ -228,9 +279,16 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
         config.setBrokerServicePortTls(Optional.of(0));
-        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsCertificateFilePath(brokerCertFilePath);
+        config.setTlsKeyFilePath(brokerFilePath);
+        config.setTlsTrustCertsFilePath(caCertFilePath);
+        config.setTlsEnabledWithKeyStore(tlsWithKeyStore);
+        config.setTlsKeyStore(brokerKeyStorePath);
+        config.setTlsKeyStoreType(keyStoreType);
+        config.setTlsKeyStorePassword(keyStorePassword);
+        config.setTlsTrustStore(brokerTrustStorePath);
+        config.setTlsTrustStoreType(keyStoreType);
+        config.setTlsTrustStorePassword(keyStorePassword);
         
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
         config.setDefaultNumberOfNamespaceBundles(1);
         config.setAllowAutoTopicCreationType("non-partitioned");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
index 7c28f183c99..4dbcd3559c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTlsTest.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
+import java.util.List;
+import java.util.Optional;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import org.testng.collections.Lists;
 
 @Test(groups = "broker")
 public class ReplicatorTlsTest extends ReplicatorTestBase {
@@ -50,11 +51,19 @@ public class ReplicatorTlsTest extends ReplicatorTestBase {
     @Test
     public void testReplicationClient() throws Exception {
         log.info("--- Starting ReplicatorTlsTest::testReplicationClient ---");
-        for (BrokerService ns : Lists.newArrayList(ns1, ns2, ns3)) {
+        for (BrokerService ns : List.of(ns1, ns2, ns3)) {
+            // load the client
+            ns.getReplicationClient(cluster1, 
Optional.of(admin1.clusters().getCluster(cluster1)));
+            ns.getReplicationClient(cluster2, 
Optional.of(admin1.clusters().getCluster(cluster2)));
+            ns.getReplicationClient(cluster3, 
Optional.of(admin1.clusters().getCluster(cluster3)));
+
+            // verify the client
             ns.getReplicationClients().forEach((cluster, client) -> {
-                assertTrue(((PulsarClientImpl) 
client).getConfiguration().isUseTls());
-                assertEquals(((PulsarClientImpl) 
client).getConfiguration().getTlsTrustCertsFilePath(),
-                        TLS_SERVER_CERT_FILE_PATH);
+                ClientConfigurationData configuration = ((PulsarClientImpl) 
client).getConfiguration();
+                assertTrue(configuration.isUseTls());
+                assertEquals(configuration.getTlsTrustCertsFilePath(), 
caCertFilePath);
+                assertEquals(configuration.getTlsKeyFilePath(), 
clientKeyFilePath);
+                assertEquals(configuration.getTlsCertificateFilePath(), 
clientCertFilePath);
             });
         }
     }

Reply via email to