This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fffeb1  [issue 10693] use unify configuration from remote cluster for 
geo-replicator (#10710)
6fffeb1 is described below

commit 6fffeb190a61611a0db5980aba879c86ed63a93a
Author: linlinnn <[email protected]>
AuthorDate: Tue Jun 1 22:55:28 2021 +0800

    [issue 10693] use unify configuration from remote cluster for 
geo-replicator (#10710)
    
    Fixes #10693
    
    ### Motivation
    use unify configuration from remote cluster for geo-replicator
---
 .../pulsar/broker/service/BrokerService.java       |  49 +++--
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  21 ++
 .../org/apache/pulsar/admin/cli/CmdClusters.java   | 239 +++++++++++++++------
 .../apache/pulsar/admin/cli/TestCmdClusters.java   |  89 ++++++++
 .../pulsar/common/policies/data/ClusterData.java   | 171 +++++++--------
 5 files changed, 402 insertions(+), 167 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1d8bff7..2fc9b29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1029,22 +1029,21 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                     
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                 }
-                if (pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
-                    clientBuilder
-                            
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? 
data.getBrokerServiceUrlTls()
-                                    : data.getServiceUrlTls())
-                            .enableTls(true)
-                            
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
-                    if 
(pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
-                        clientBuilder.useKeyStoreTls(true)
-                                
.tlsTrustStoreType(pulsar.getConfiguration().getBrokerClientTlsTrustStoreType())
-                                
.tlsTrustStorePath(pulsar.getConfiguration().getBrokerClientTlsTrustStore())
-                                
.tlsTrustStorePassword(pulsar.getConfiguration()
-                                        
.getBrokerClientTlsTrustStorePassword());
-                    } else {
-                        
clientBuilder.tlsTrustCertsFilePath(pulsar.getConfiguration()
-                                .getBrokerClientTrustCertsFilePath());
-                    }
+                String serviceUrlTls = 
isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
+                        : data.getServiceUrlTls();
+                if (data.isBrokerClientTlsEnabled()) {
+                    configTlsSettings(clientBuilder, serviceUrlTls,
+                            data.isBrokerClientTlsEnabledWithKeyStore(), 
data.isTlsAllowInsecureConnection(),
+                            data.getBrokerClientTlsTrustStoreType(), 
data.getBrokerClientTlsTrustStore(),
+                            data.getBrokerClientTlsTrustStorePassword(), 
data.getBrokerClientTrustCertsFilePath());
+                } else if 
(pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
+                    configTlsSettings(clientBuilder, serviceUrlTls,
+                            
pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore(),
+                            
pulsar.getConfiguration().isTlsAllowInsecureConnection(),
+                            
pulsar.getConfiguration().getBrokerClientTlsTrustStoreType(),
+                            
pulsar.getConfiguration().getBrokerClientTlsTrustStore(),
+                            
pulsar.getConfiguration().getBrokerClientTlsTrustStorePassword(),
+                            
pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
                 } else {
                     clientBuilder.serviceUrl(
                             isNotBlank(data.getBrokerServiceUrl()) ? 
data.getBrokerServiceUrl() : data.getServiceUrl());
@@ -1063,6 +1062,24 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         });
     }
 
+    private void configTlsSettings(ClientBuilder clientBuilder, String 
serviceUrl,
+                                   boolean brokerClientTlsEnabledWithKeyStore, 
boolean isTlsAllowInsecureConnection,
+                                   String brokerClientTlsTrustStoreType, 
String brokerClientTlsTrustStore,
+                                   String brokerClientTlsTrustStorePassword, 
String brokerClientTrustCertsFilePath) {
+        clientBuilder
+                .serviceUrl(serviceUrl)
+                .enableTls(true)
+                .allowTlsInsecureConnection(isTlsAllowInsecureConnection);
+        if (brokerClientTlsEnabledWithKeyStore) {
+            clientBuilder.useKeyStoreTls(true)
+                    .tlsTrustStoreType(brokerClientTlsTrustStoreType)
+                    .tlsTrustStorePath(brokerClientTlsTrustStore)
+                    .tlsTrustStorePassword(brokerClientTlsTrustStorePassword);
+        } else {
+            
clientBuilder.tlsTrustCertsFilePath(brokerClientTrustCertsFilePath);
+        }
+    }
+
     public PulsarAdmin getClusterPulsarAdmin(String cluster) {
         PulsarAdmin admin = clusterAdmins.get(cluster);
         if (admin != null) {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 24a7f76..014977d 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -249,6 +249,27 @@ public class PulsarAdminToolTest {
 
         clusters.run(split("delete my-secure-cluster"));
         verify(mockClusters).deleteCluster("my-secure-cluster");
+
+        // test create cluster with tls
+        clusters = new CmdClusters(() -> admin);
+        clusters.run(split("create my-tls-cluster --url-secure 
https://my-service.url:4443 --tls-enable "
+                + "--tls-enable-keystore --tls-trust-store-type JKS 
--tls-trust-store /var/private/tls/client.truststore.jks "
+                + "--tls-trust-store-pwd clientpw"));
+        ClusterData data = new ClusterData(null, 
"https://my-service.url:4443";);
+        data.setBrokerClientTlsEnabled(true)
+                .setBrokerClientTlsEnabledWithKeyStore(true)
+                .setBrokerClientTlsTrustStoreType("JKS")
+                
.setBrokerClientTlsTrustStore("/var/private/tls/client.truststore.jks")
+                .setBrokerClientTlsTrustStorePassword("clientpw");
+        verify(mockClusters).createCluster("my-tls-cluster", data);
+
+        clusters.run(split("update my-tls-cluster --url-secure 
https://my-service.url:4443 --tls-enable "
+                + "--tls-trust-certs-filepath /path/to/ca.cert.pem"));
+        data.setBrokerClientTlsEnabledWithKeyStore(false)
+                .setBrokerClientTlsTrustStore(null)
+                .setBrokerClientTlsTrustStorePassword(null)
+                .setBrokerClientTrustCertsFilePath("/path/to/ca.cert.pem");
+        verify(mockClusters).updateCluster("my-tls-cluster", data);
     }
 
     @Test
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index da7d9cf..fc538da 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -18,20 +18,21 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Sets;
 import java.util.Arrays;
 import java.util.function.Supplier;
-
 import org.apache.commons.lang3.StringUtils;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.collect.Sets;
+import lombok.Getter;
 
 @Parameters(commandDescription = "Operations about clusters")
 public class CmdClusters extends CmdBase {
@@ -55,76 +56,43 @@ public class CmdClusters extends CmdBase {
     }
 
     @Parameters(commandDescription = "Provisions a new cluster. This operation 
requires Pulsar super-user privileges")
-    private class Create extends CliCommand {
-        @Parameter(description = "cluster-name", required = true)
-        private java.util.List<String> params;
-
-        @Parameter(names = "--url", description = "service-url", required = 
false)
-        private String serviceUrl;
-
-        @Parameter(names = "--url-secure", description = "service-url for 
secure connection", required = false)
-        private String serviceUrlTls;
-        
-        @Parameter(names = "--broker-url", description = "broker-service-url", 
required = false)
-        private String brokerServiceUrl;
-        
-        @Parameter(names = "--broker-url-secure", description = 
"broker-service-url for secure connection", required = false)
-        private String brokerServiceUrlTls;
+    private class Create extends ClusterDetailsCommand {
 
-        @Parameter(names = "--proxy-url", description = "Proxy-service url 
when client would like to connect to broker via proxy.", required = false)
-        private String proxyServiceUrl;
-
-        @Parameter(names = "--auth-plugin", description = "authentication 
plugin", required = false)
-        private String authenticationPlugin;
-
-        @Parameter(names = "--auth-parameters", description = "authentication 
parameters", required = false)
-        private String authenticationParameters;
+        @Override
+        void runCmd() throws Exception {
+            String cluster = getOneArgument(params);
+            getAdmin().clusters().createCluster(cluster, clusterData);
+        }
 
-        @Parameter(names = "--proxy-protocol", description = "protocol to 
decide type of proxy routing eg: SNI", required = false)
-        private ProxyProtocol proxyProtocol;
+    }
 
-        void run() throws PulsarAdminException {
-            String cluster = getOneArgument(params);
-            getAdmin().clusters().createCluster(cluster,
-                    new ClusterData(serviceUrl, serviceUrlTls, 
brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl,
-                            authenticationPlugin, authenticationParameters, 
proxyProtocol));
+    protected void validateClusterData(ClusterData clusterData) {
+        if (clusterData.isBrokerClientTlsEnabled()) {
+            if (clusterData.isBrokerClientTlsEnabledWithKeyStore()) {
+                if 
(StringUtils.isAnyBlank(clusterData.getBrokerClientTlsTrustStoreType(), 
clusterData.getBrokerClientTlsTrustStore(),
+                        clusterData.getBrokerClientTlsTrustStorePassword())) {
+                    throw new RuntimeException(
+                            "You must specify tls-trust-store-type, 
tls-trust-store and tls-trust-store-pwd"
+                                    + " when enable tls-enable-keystore");
+                }
+            } else {
+                if 
(StringUtils.isBlank(clusterData.getBrokerClientTrustCertsFilePath())) {
+                    throw new RuntimeException("You must specify 
tls-trust-certs-filepath"
+                            + " when tls-enable-keystore is not enable");
+                }
+            }
         }
     }
 
     @Parameters(commandDescription = "Update the configuration for a cluster")
-    private class Update extends CliCommand {
-        @Parameter(description = "cluster-name", required = true)
-        private java.util.List<String> params;
-
-        @Parameter(names = "--url", description = "service-url", required = 
false)
-        private String serviceUrl;
+    private class Update extends ClusterDetailsCommand {
 
-        @Parameter(names = "--url-secure", description = "service-url for 
secure connection", required = false)
-        private String serviceUrlTls;
-        
-        @Parameter(names = "--broker-url", description = "broker-service-url", 
required = false)
-        private String brokerServiceUrl;
-        
-        @Parameter(names = "--broker-url-secure", description = 
"broker-service-url for secure connection", required = false)
-        private String brokerServiceUrlTls;
-
-        @Parameter(names = "--proxy-url", description = "Proxy-service url 
when client would like to connect to broker via proxy.", required = false)
-        private String proxyServiceUrl;
-
-        @Parameter(names = "--auth-plugin", description = "authentication 
plugin", required = false)
-        private String authenticationPlugin;
-
-        @Parameter(names = "--auth-parameters", description = "authentication 
parameters", required = false)
-        private String authenticationParameters;
-
-        @Parameter(names = "--proxy-protocol", description = "protocol to 
decide type of proxy routing eg: SNI", required = false)
-        private ProxyProtocol proxyProtocol;
-
-        void run() throws PulsarAdminException {
+        @Override
+        void runCmd() throws Exception {
             String cluster = getOneArgument(params);
-            getAdmin().clusters().updateCluster(cluster, new 
ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl,
-                    brokerServiceUrlTls, proxyServiceUrl, 
authenticationPlugin, authenticationParameters, proxyProtocol));
+            getAdmin().clusters().updateCluster(cluster, clusterData);
         }
+
     }
 
     @Parameters(commandDescription = "Deletes an existing cluster")
@@ -264,6 +232,145 @@ public class CmdClusters extends CmdBase {
             print(getAdmin().clusters().getFailureDomain(cluster, domainName));
         }
     }
+
+    /**
+     * Base command
+     */
+    @Getter
+    abstract class BaseCommand extends CliCommand {
+        @Override
+        void run() throws Exception {
+            try {
+                processArguments();
+            } catch (Exception e) {
+                System.err.println(e.getMessage());
+                System.err.println();
+                String chosenCommand = jcommander.getParsedCommand();
+                getUsageFormatter().usage(chosenCommand);
+                return;
+            }
+            runCmd();
+        }
+
+        void processArguments() throws Exception {
+        }
+
+        abstract void runCmd() throws Exception;
+    }
+
+    abstract class ClusterDetailsCommand extends BaseCommand {
+        @Parameter(description = "cluster-name", required = true)
+        protected java.util.List<String> params;
+
+        @Parameter(names = "--url", description = "service-url", required = 
false)
+        protected String serviceUrl;
+
+        @Parameter(names = "--url-secure", description = "service-url for 
secure connection", required = false)
+        protected String serviceUrlTls;
+
+        @Parameter(names = "--broker-url", description = "broker-service-url", 
required = false)
+        protected String brokerServiceUrl;
+
+        @Parameter(names = "--broker-url-secure", description = 
"broker-service-url for secure connection", required = false)
+        protected String brokerServiceUrlTls;
+
+        @Parameter(names = "--proxy-url", description = "Proxy-service url 
when client would like to connect to broker via proxy.", required = false)
+        protected String proxyServiceUrl;
+
+        @Parameter(names = "--auth-plugin", description = "authentication 
plugin", required = false)
+        protected String authenticationPlugin;
+
+        @Parameter(names = "--auth-parameters", description = "authentication 
parameters", required = false)
+        protected String authenticationParameters;
+
+        @Parameter(names = "--proxy-protocol", description = "protocol to 
decide type of proxy routing eg: SNI", required = false)
+        protected ProxyProtocol proxyProtocol;
+
+        @Parameter(names = "--tls-enable", description = "Enable tls 
connection", required = false)
+        protected Boolean brokerClientTlsEnabled;
+
+        @Parameter(names = "--tls-allow-insecure", description = "Allow 
insecure tls connection", required = false)
+        protected Boolean tlsAllowInsecureConnection;
+
+        @Parameter(names = "--tls-enable-keystore", description = "Whether use 
KeyStore type to authenticate", required = false)
+        protected Boolean brokerClientTlsEnabledWithKeyStore;
+
+        @Parameter(names = "--tls-trust-store-type", description = "TLS 
TrustStore type configuration for internal client eg: JKS", required = false)
+        protected String brokerClientTlsTrustStoreType;
+
+        @Parameter(names = "--tls-trust-store", description = "TLS TrustStore 
path for internal client", required = false)
+        protected String brokerClientTlsTrustStore;
+
+        @Parameter(names = "--tls-trust-store-pwd", description = "TLS 
TrustStore password for internal client", required = false)
+        protected String brokerClientTlsTrustStorePassword;
+
+        @Parameter(names = "--tls-trust-certs-filepath", description = "path 
for the trusted TLS certificate file", required = false)
+        protected String brokerClientTrustCertsFilePath;
+
+        @Parameter(names = "--cluster-config-file", description = "The path to 
a YAML config file specifying the "
+                + "cluster's configuration")
+        protected String clusterConfigFile;
+
+        protected ClusterData clusterData;
+
+        @Override
+        void processArguments() throws Exception {
+            super.processArguments();
+
+            if (null != clusterConfigFile) {
+                this.clusterData = CmdUtils.loadConfig(clusterConfigFile, 
ClusterData.class);
+            } else {
+                this.clusterData = new ClusterData();
+            }
+
+            if (serviceUrl != null) {
+                clusterData.setServiceUrl(serviceUrl);
+            }
+            if (serviceUrlTls != null) {
+                clusterData.setServiceUrlTls(serviceUrlTls);
+            }
+            if (brokerServiceUrl != null) {
+                clusterData.setBrokerServiceUrl(brokerServiceUrl);
+            }
+            if (brokerServiceUrlTls != null) {
+                clusterData.setBrokerServiceUrlTls(brokerServiceUrlTls);
+            }
+            if (proxyServiceUrl != null) {
+                clusterData.setProxyServiceUrl(proxyServiceUrl);
+            }
+            if (authenticationPlugin != null) {
+                clusterData.setAuthenticationPlugin(authenticationPlugin);
+            }
+            if (authenticationParameters != null) {
+                
clusterData.setAuthenticationParameters(authenticationParameters);
+            }
+            if (proxyProtocol != null) {
+                clusterData.setProxyProtocol(proxyProtocol);
+            }
+            if (brokerClientTlsEnabled != null) {
+                clusterData.setBrokerClientTlsEnabled(brokerClientTlsEnabled);
+            }
+            if (tlsAllowInsecureConnection != null) {
+                
clusterData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+            }
+            if (brokerClientTlsEnabledWithKeyStore != null) {
+                
clusterData.setBrokerClientTlsEnabledWithKeyStore(brokerClientTlsEnabledWithKeyStore);
+            }
+            if (brokerClientTlsTrustStoreType != null) {
+                
clusterData.setBrokerClientTlsTrustStoreType(brokerClientTlsTrustStoreType);
+            }
+            if (brokerClientTlsTrustStore != null) {
+                
clusterData.setBrokerClientTlsTrustStore(brokerClientTlsTrustStore);
+            }
+            if (brokerClientTlsTrustStorePassword != null) {
+                
clusterData.setBrokerClientTlsTrustStorePassword(brokerClientTlsTrustStorePassword);
+            }
+            if (brokerClientTrustCertsFilePath != null) {
+                
clusterData.setBrokerClientTrustCertsFilePath(brokerClientTrustCertsFilePath);
+            }
+            validateClusterData(clusterData);
+        }
+    }
     
     public CmdClusters(Supplier<PulsarAdmin> admin) {
         super("clusters", admin);
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
new file mode 100644
index 0000000..7cf17cf
--- /dev/null
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import org.apache.pulsar.client.api.ProxyProtocol;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+import java.io.File;
+import java.nio.file.Files;
+import org.apache.pulsar.admin.cli.utils.CmdUtils;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCmdClusters {
+
+    private PulsarAdmin pulsarAdmin;
+
+    private CmdClusters cmdClusters;
+
+    private Clusters clusters;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        pulsarAdmin = mock(PulsarAdmin.class);
+        clusters = mock(Clusters.class);
+        when(pulsarAdmin.clusters()).thenReturn(clusters);
+
+        cmdClusters = spy(new CmdClusters(() -> pulsarAdmin));
+    }
+
+    @Test
+    public void testCmdClusterConfigFile() throws Exception {
+        ClusterData clusterData = buildClusterData();
+        testCmdClusterConfigFile(clusterData, clusterData);
+    }
+
+    public void testCmdClusterConfigFile(ClusterData testClusterData, 
ClusterData expectedClusterData) throws Exception {
+        File file = Files.createTempFile("tmp_cluster", ".yaml").toFile();
+        new YAMLMapper().writeValue(file, testClusterData);
+        Assert.assertEquals(testClusterData, 
CmdUtils.loadConfig(file.getAbsolutePath(), ClusterData.class));
+
+        // test create cluster
+        cmdClusters.run(new String[]{"create", "test_cluster", 
"--cluster-config-file", file.getAbsolutePath()});
+        verify(clusters).createCluster(eq("test_cluster"), 
eq(expectedClusterData));
+
+        cmdClusters.run(new String[]{"update", "test_cluster", 
"--cluster-config-file", file.getAbsolutePath()});
+        verify(clusters).updateCluster(eq("test_cluster"), 
eq(expectedClusterData));
+    }
+
+    public ClusterData buildClusterData() {
+        return ClusterData.builder()
+                .serviceUrlTls("https://my-service.url:4443";)
+                .authenticationPlugin("authenticationPlugin")
+                .authenticationParameters("authenticationParameters")
+                .proxyProtocol(ProxyProtocol.SNI)
+                .brokerClientTlsEnabled(true)
+                .brokerClientTlsEnabledWithKeyStore(true)
+                .brokerClientTlsTrustStoreType("JKS")
+                
.brokerClientTlsTrustStore("/var/private/tls/client.truststore.jks")
+                .brokerClientTlsTrustStorePassword("clientpw")
+                .build();
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index 7d5f497..25551fb 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -24,6 +24,12 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import java.util.LinkedHashSet;
 import java.util.Objects;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.experimental.Accessors;
 import org.apache.pulsar.client.api.ProxyProtocol;
 
 /**
@@ -33,6 +39,12 @@ import org.apache.pulsar.client.api.ProxyProtocol;
     value = "ClusterData",
     description = "The configuration data for a cluster"
 )
+@Getter
+@Setter
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
 public class ClusterData {
     @ApiModelProperty(
         name = "serviceUrl",
@@ -66,14 +78,14 @@ public class ClusterData {
     )
     private String proxyServiceUrl;
     @ApiModelProperty(
-            name = "authenticationPlugin",
-            value = "Authentication plugin when client would like to connect 
to cluster.",
-            example = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
+        name = "authenticationPlugin",
+        value = "Authentication plugin when client would like to connect to 
cluster.",
+        example = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
     )
     private String authenticationPlugin;
     @ApiModelProperty(
-            name = "authenticationParameters",
-            value = "Authentication parameters when client would like to 
connect to cluster."
+        name = "authenticationParameters",
+        value = "Authentication parameters when client would like to connect 
to cluster."
     )
     private String authenticationParameters;
     @ApiModelProperty(
@@ -90,9 +102,48 @@ public class ClusterData {
         value = "A set of peer cluster names"
     )
     private LinkedHashSet<String> peerClusterNames;
-
-    public ClusterData() {
-    }
+    @ApiModelProperty(
+        name = "brokerClientTlsEnabled",
+        value = "Enable TLS when talking with other brokers in the same 
cluster (admin operation)"
+                + " or different clusters (replication)"
+    )
+    private boolean brokerClientTlsEnabled = false;
+    @ApiModelProperty(
+        name = "tlsAllowInsecureConnection",
+        value = "Allow TLS connections to servers whose certificate cannot be"
+                + " be verified to have been signed by a trusted certificate"
+                + " authority."
+    )
+    private boolean tlsAllowInsecureConnection = false;
+    @ApiModelProperty(
+        name = "brokerClientTlsEnabledWithKeyStore",
+        value = "Whether internal client use KeyStore type to authenticate 
with other Pulsar brokers"
+    )
+    private boolean brokerClientTlsEnabledWithKeyStore = false;
+    @ApiModelProperty(
+        name = "brokerClientTlsTrustStoreType",
+        value = "TLS TrustStore type configuration for internal client: JKS, 
PKCS12"
+                + " used by the internal client to authenticate with Pulsar 
brokers",
+        example = "JKS"
+    )
+    private String brokerClientTlsTrustStoreType = "JKS";
+    @ApiModelProperty(
+        name = "brokerClientTlsTrustStore",
+        value = "TLS TrustStore path for internal client"
+                + " used by the internal client to authenticate with Pulsar 
brokers"
+    )
+    private String brokerClientTlsTrustStore;
+    @ApiModelProperty(
+        name = "brokerClientTlsTrustStorePassword",
+        value = "TLS TrustStore password for internal client"
+                + " used by the internal client to authenticate with Pulsar 
brokers"
+    )
+    private String brokerClientTlsTrustStorePassword;
+    @ApiModelProperty(
+        name = "brokerClientTrustCertsFilePath",
+        value = "Path for the trusted TLS certificate file for outgoing 
connection to a server (broker)"
+    )
+    private String brokerClientTrustCertsFilePath;
 
     public ClusterData(String serviceUrl) {
         this(serviceUrl, "");
@@ -144,78 +195,13 @@ public class ClusterData {
         this.proxyProtocol = other.proxyProtocol;
         this.authenticationPlugin = other.authenticationPlugin;
         this.authenticationParameters = other.authenticationParameters;
-    }
-
-    public String getServiceUrl() {
-        return serviceUrl;
-    }
-
-    public String getServiceUrlTls() {
-        return serviceUrlTls;
-    }
-
-    public void setServiceUrl(String serviceUrl) {
-        this.serviceUrl = serviceUrl;
-    }
-
-    public void setServiceUrlTls(String serviceUrlTls) {
-        this.serviceUrlTls = serviceUrlTls;
-    }
-
-    public String getBrokerServiceUrl() {
-        return brokerServiceUrl;
-    }
-
-    public void setBrokerServiceUrl(String brokerServiceUrl) {
-        this.brokerServiceUrl = brokerServiceUrl;
-    }
-
-    public String getBrokerServiceUrlTls() {
-        return brokerServiceUrlTls;
-    }
-
-    public void setBrokerServiceUrlTls(String brokerServiceUrlTls) {
-        this.brokerServiceUrlTls = brokerServiceUrlTls;
-    }
-
-    public String getProxyServiceUrl() {
-        return proxyServiceUrl;
-    }
-
-    public void setProxyServiceUrl(String proxyServiceUrl) {
-        this.proxyServiceUrl = proxyServiceUrl;
-    }
-
-    public ProxyProtocol getProxyProtocol() {
-        return proxyProtocol;
-    }
-
-    public void setProxyProtocol(ProxyProtocol proxyProtocol) {
-        this.proxyProtocol = proxyProtocol;
-    }
-
-    public LinkedHashSet<String> getPeerClusterNames() {
-        return peerClusterNames;
-    }
-
-    public String getAuthenticationPlugin() {
-        return authenticationPlugin;
-    }
-
-    public void setAuthenticationPlugin(String authenticationPlugin) {
-        this.authenticationPlugin = authenticationPlugin;
-    }
-
-    public String getAuthenticationParameters() {
-        return authenticationParameters;
-    }
-
-    public void setAuthenticationParameters(String authenticationParameters) {
-        this.authenticationParameters = authenticationParameters;
-    }
-
-    public void setPeerClusterNames(LinkedHashSet<String> peerClusterNames) {
-        this.peerClusterNames = peerClusterNames;
+        this.brokerClientTlsEnabled = other.brokerClientTlsEnabled;
+        this.tlsAllowInsecureConnection = other.tlsAllowInsecureConnection;
+        this.brokerClientTlsEnabledWithKeyStore = 
other.brokerClientTlsEnabledWithKeyStore;
+        this.brokerClientTlsTrustStoreType = 
other.brokerClientTlsTrustStoreType;
+        this.brokerClientTlsTrustStore = other.brokerClientTlsTrustStore;
+        this.brokerClientTlsTrustStorePassword = 
other.brokerClientTlsTrustStorePassword;
+        this.brokerClientTrustCertsFilePath = 
other.brokerClientTrustCertsFilePath;
     }
 
     @Override
@@ -228,8 +214,14 @@ public class ClusterData {
                     && Objects.equals(proxyServiceUrl, other.proxyServiceUrl)
                     && Objects.equals(proxyProtocol, other.proxyProtocol)
                     && Objects.equals(authenticationPlugin, 
other.authenticationPlugin)
-                    && Objects.equals(authenticationParameters, 
other.authenticationParameters);
-
+                    && Objects.equals(authenticationParameters, 
other.authenticationParameters)
+                    && Objects.equals(brokerClientTlsEnabled, 
other.brokerClientTlsEnabled)
+                    && Objects.equals(tlsAllowInsecureConnection, 
other.tlsAllowInsecureConnection)
+                    && Objects.equals(brokerClientTlsEnabledWithKeyStore, 
other.brokerClientTlsEnabledWithKeyStore)
+                    && Objects.equals(brokerClientTlsTrustStoreType, 
other.brokerClientTlsTrustStoreType)
+                    && Objects.equals(brokerClientTlsTrustStore, 
other.brokerClientTlsTrustStore)
+                    && Objects.equals(brokerClientTlsTrustStorePassword, 
other.brokerClientTlsTrustStorePassword)
+                    && Objects.equals(brokerClientTrustCertsFilePath, 
other.brokerClientTrustCertsFilePath);
         }
 
         return false;
@@ -237,7 +229,7 @@ public class ClusterData {
 
     @Override
     public int hashCode() {
-       return Objects.hash(this.toString());
+        return Objects.hash(this.toString());
     }
 
     @Override
@@ -249,8 +241,17 @@ public class ClusterData {
                 .add("brokerServiceUrlTls", brokerServiceUrlTls)
                 .add("proxyServiceUrl", proxyServiceUrl)
                 .add("proxyProtocol", proxyProtocol)
-                .add("peerClusterNames", 
peerClusterNames).add("authenticationPlugin", authenticationPlugin)
-                .add("authenticationParameters", 
authenticationParameters).toString();
+                .add("peerClusterNames", peerClusterNames)
+                .add("authenticationPlugin", authenticationPlugin)
+                .add("authenticationParameters", authenticationParameters)
+                .add("brokerClientTlsEnabled", brokerClientTlsEnabled)
+                .add("tlsAllowInsecureConnection", tlsAllowInsecureConnection)
+                .add("brokerClientTlsEnabledWithKeyStore", 
brokerClientTlsEnabledWithKeyStore)
+                .add("brokerClientTlsTrustStoreType", 
brokerClientTlsTrustStoreType)
+                .add("brokerClientTlsTrustStore", brokerClientTlsTrustStore)
+                .add("brokerClientTlsTrustStorePassword", 
brokerClientTlsTrustStorePassword)
+                .add("brokerClientTrustCertsFilePath", 
brokerClientTrustCertsFilePath)
+                .toString();
     }
 
 }

Reply via email to