This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6fffeb1 [issue 10693] use unify configuration from remote cluster for
geo-replicator (#10710)
6fffeb1 is described below
commit 6fffeb190a61611a0db5980aba879c86ed63a93a
Author: linlinnn <[email protected]>
AuthorDate: Tue Jun 1 22:55:28 2021 +0800
[issue 10693] use unify configuration from remote cluster for
geo-replicator (#10710)
Fixes #10693
### Motivation
use unify configuration from remote cluster for geo-replicator
---
.../pulsar/broker/service/BrokerService.java | 49 +++--
.../pulsar/admin/cli/PulsarAdminToolTest.java | 21 ++
.../org/apache/pulsar/admin/cli/CmdClusters.java | 239 +++++++++++++++------
.../apache/pulsar/admin/cli/TestCmdClusters.java | 89 ++++++++
.../pulsar/common/policies/data/ClusterData.java | 171 +++++++--------
5 files changed, 402 insertions(+), 167 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1d8bff7..2fc9b29 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1029,22 +1029,21 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
- if (pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
- clientBuilder
-
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ?
data.getBrokerServiceUrlTls()
- : data.getServiceUrlTls())
- .enableTls(true)
-
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
- if
(pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
- clientBuilder.useKeyStoreTls(true)
-
.tlsTrustStoreType(pulsar.getConfiguration().getBrokerClientTlsTrustStoreType())
-
.tlsTrustStorePath(pulsar.getConfiguration().getBrokerClientTlsTrustStore())
-
.tlsTrustStorePassword(pulsar.getConfiguration()
-
.getBrokerClientTlsTrustStorePassword());
- } else {
-
clientBuilder.tlsTrustCertsFilePath(pulsar.getConfiguration()
- .getBrokerClientTrustCertsFilePath());
- }
+ String serviceUrlTls =
isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
+ : data.getServiceUrlTls();
+ if (data.isBrokerClientTlsEnabled()) {
+ configTlsSettings(clientBuilder, serviceUrlTls,
+ data.isBrokerClientTlsEnabledWithKeyStore(),
data.isTlsAllowInsecureConnection(),
+ data.getBrokerClientTlsTrustStoreType(),
data.getBrokerClientTlsTrustStore(),
+ data.getBrokerClientTlsTrustStorePassword(),
data.getBrokerClientTrustCertsFilePath());
+ } else if
(pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
+ configTlsSettings(clientBuilder, serviceUrlTls,
+
pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore(),
+
pulsar.getConfiguration().isTlsAllowInsecureConnection(),
+
pulsar.getConfiguration().getBrokerClientTlsTrustStoreType(),
+
pulsar.getConfiguration().getBrokerClientTlsTrustStore(),
+
pulsar.getConfiguration().getBrokerClientTlsTrustStorePassword(),
+
pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
} else {
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ?
data.getBrokerServiceUrl() : data.getServiceUrl());
@@ -1063,6 +1062,24 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
});
}
+ private void configTlsSettings(ClientBuilder clientBuilder, String
serviceUrl,
+ boolean brokerClientTlsEnabledWithKeyStore,
boolean isTlsAllowInsecureConnection,
+ String brokerClientTlsTrustStoreType,
String brokerClientTlsTrustStore,
+ String brokerClientTlsTrustStorePassword,
String brokerClientTrustCertsFilePath) {
+ clientBuilder
+ .serviceUrl(serviceUrl)
+ .enableTls(true)
+ .allowTlsInsecureConnection(isTlsAllowInsecureConnection);
+ if (brokerClientTlsEnabledWithKeyStore) {
+ clientBuilder.useKeyStoreTls(true)
+ .tlsTrustStoreType(brokerClientTlsTrustStoreType)
+ .tlsTrustStorePath(brokerClientTlsTrustStore)
+ .tlsTrustStorePassword(brokerClientTlsTrustStorePassword);
+ } else {
+
clientBuilder.tlsTrustCertsFilePath(brokerClientTrustCertsFilePath);
+ }
+ }
+
public PulsarAdmin getClusterPulsarAdmin(String cluster) {
PulsarAdmin admin = clusterAdmins.get(cluster);
if (admin != null) {
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 24a7f76..014977d 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -249,6 +249,27 @@ public class PulsarAdminToolTest {
clusters.run(split("delete my-secure-cluster"));
verify(mockClusters).deleteCluster("my-secure-cluster");
+
+ // test create cluster with tls
+ clusters = new CmdClusters(() -> admin);
+ clusters.run(split("create my-tls-cluster --url-secure
https://my-service.url:4443 --tls-enable "
+ + "--tls-enable-keystore --tls-trust-store-type JKS
--tls-trust-store /var/private/tls/client.truststore.jks "
+ + "--tls-trust-store-pwd clientpw"));
+ ClusterData data = new ClusterData(null,
"https://my-service.url:4443");
+ data.setBrokerClientTlsEnabled(true)
+ .setBrokerClientTlsEnabledWithKeyStore(true)
+ .setBrokerClientTlsTrustStoreType("JKS")
+
.setBrokerClientTlsTrustStore("/var/private/tls/client.truststore.jks")
+ .setBrokerClientTlsTrustStorePassword("clientpw");
+ verify(mockClusters).createCluster("my-tls-cluster", data);
+
+ clusters.run(split("update my-tls-cluster --url-secure
https://my-service.url:4443 --tls-enable "
+ + "--tls-trust-certs-filepath /path/to/ca.cert.pem"));
+ data.setBrokerClientTlsEnabledWithKeyStore(false)
+ .setBrokerClientTlsTrustStore(null)
+ .setBrokerClientTlsTrustStorePassword(null)
+ .setBrokerClientTrustCertsFilePath("/path/to/ca.cert.pem");
+ verify(mockClusters).updateCluster("my-tls-cluster", data);
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index da7d9cf..fc538da 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -18,20 +18,21 @@
*/
package org.apache.pulsar.admin.cli;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.function.Supplier;
-
import org.apache.commons.lang3.StringUtils;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.collect.Sets;
+import lombok.Getter;
@Parameters(commandDescription = "Operations about clusters")
public class CmdClusters extends CmdBase {
@@ -55,76 +56,43 @@ public class CmdClusters extends CmdBase {
}
@Parameters(commandDescription = "Provisions a new cluster. This operation
requires Pulsar super-user privileges")
- private class Create extends CliCommand {
- @Parameter(description = "cluster-name", required = true)
- private java.util.List<String> params;
-
- @Parameter(names = "--url", description = "service-url", required =
false)
- private String serviceUrl;
-
- @Parameter(names = "--url-secure", description = "service-url for
secure connection", required = false)
- private String serviceUrlTls;
-
- @Parameter(names = "--broker-url", description = "broker-service-url",
required = false)
- private String brokerServiceUrl;
-
- @Parameter(names = "--broker-url-secure", description =
"broker-service-url for secure connection", required = false)
- private String brokerServiceUrlTls;
+ private class Create extends ClusterDetailsCommand {
- @Parameter(names = "--proxy-url", description = "Proxy-service url
when client would like to connect to broker via proxy.", required = false)
- private String proxyServiceUrl;
-
- @Parameter(names = "--auth-plugin", description = "authentication
plugin", required = false)
- private String authenticationPlugin;
-
- @Parameter(names = "--auth-parameters", description = "authentication
parameters", required = false)
- private String authenticationParameters;
+ @Override
+ void runCmd() throws Exception {
+ String cluster = getOneArgument(params);
+ getAdmin().clusters().createCluster(cluster, clusterData);
+ }
- @Parameter(names = "--proxy-protocol", description = "protocol to
decide type of proxy routing eg: SNI", required = false)
- private ProxyProtocol proxyProtocol;
+ }
- void run() throws PulsarAdminException {
- String cluster = getOneArgument(params);
- getAdmin().clusters().createCluster(cluster,
- new ClusterData(serviceUrl, serviceUrlTls,
brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl,
- authenticationPlugin, authenticationParameters,
proxyProtocol));
+ protected void validateClusterData(ClusterData clusterData) {
+ if (clusterData.isBrokerClientTlsEnabled()) {
+ if (clusterData.isBrokerClientTlsEnabledWithKeyStore()) {
+ if
(StringUtils.isAnyBlank(clusterData.getBrokerClientTlsTrustStoreType(),
clusterData.getBrokerClientTlsTrustStore(),
+ clusterData.getBrokerClientTlsTrustStorePassword())) {
+ throw new RuntimeException(
+ "You must specify tls-trust-store-type,
tls-trust-store and tls-trust-store-pwd"
+ + " when enable tls-enable-keystore");
+ }
+ } else {
+ if
(StringUtils.isBlank(clusterData.getBrokerClientTrustCertsFilePath())) {
+ throw new RuntimeException("You must specify
tls-trust-certs-filepath"
+ + " when tls-enable-keystore is not enable");
+ }
+ }
}
}
@Parameters(commandDescription = "Update the configuration for a cluster")
- private class Update extends CliCommand {
- @Parameter(description = "cluster-name", required = true)
- private java.util.List<String> params;
-
- @Parameter(names = "--url", description = "service-url", required =
false)
- private String serviceUrl;
+ private class Update extends ClusterDetailsCommand {
- @Parameter(names = "--url-secure", description = "service-url for
secure connection", required = false)
- private String serviceUrlTls;
-
- @Parameter(names = "--broker-url", description = "broker-service-url",
required = false)
- private String brokerServiceUrl;
-
- @Parameter(names = "--broker-url-secure", description =
"broker-service-url for secure connection", required = false)
- private String brokerServiceUrlTls;
-
- @Parameter(names = "--proxy-url", description = "Proxy-service url
when client would like to connect to broker via proxy.", required = false)
- private String proxyServiceUrl;
-
- @Parameter(names = "--auth-plugin", description = "authentication
plugin", required = false)
- private String authenticationPlugin;
-
- @Parameter(names = "--auth-parameters", description = "authentication
parameters", required = false)
- private String authenticationParameters;
-
- @Parameter(names = "--proxy-protocol", description = "protocol to
decide type of proxy routing eg: SNI", required = false)
- private ProxyProtocol proxyProtocol;
-
- void run() throws PulsarAdminException {
+ @Override
+ void runCmd() throws Exception {
String cluster = getOneArgument(params);
- getAdmin().clusters().updateCluster(cluster, new
ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl,
- brokerServiceUrlTls, proxyServiceUrl,
authenticationPlugin, authenticationParameters, proxyProtocol));
+ getAdmin().clusters().updateCluster(cluster, clusterData);
}
+
}
@Parameters(commandDescription = "Deletes an existing cluster")
@@ -264,6 +232,145 @@ public class CmdClusters extends CmdBase {
print(getAdmin().clusters().getFailureDomain(cluster, domainName));
}
}
+
+ /**
+ * Base command
+ */
+ @Getter
+ abstract class BaseCommand extends CliCommand {
+ @Override
+ void run() throws Exception {
+ try {
+ processArguments();
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ System.err.println();
+ String chosenCommand = jcommander.getParsedCommand();
+ getUsageFormatter().usage(chosenCommand);
+ return;
+ }
+ runCmd();
+ }
+
+ void processArguments() throws Exception {
+ }
+
+ abstract void runCmd() throws Exception;
+ }
+
+ abstract class ClusterDetailsCommand extends BaseCommand {
+ @Parameter(description = "cluster-name", required = true)
+ protected java.util.List<String> params;
+
+ @Parameter(names = "--url", description = "service-url", required =
false)
+ protected String serviceUrl;
+
+ @Parameter(names = "--url-secure", description = "service-url for
secure connection", required = false)
+ protected String serviceUrlTls;
+
+ @Parameter(names = "--broker-url", description = "broker-service-url",
required = false)
+ protected String brokerServiceUrl;
+
+ @Parameter(names = "--broker-url-secure", description =
"broker-service-url for secure connection", required = false)
+ protected String brokerServiceUrlTls;
+
+ @Parameter(names = "--proxy-url", description = "Proxy-service url
when client would like to connect to broker via proxy.", required = false)
+ protected String proxyServiceUrl;
+
+ @Parameter(names = "--auth-plugin", description = "authentication
plugin", required = false)
+ protected String authenticationPlugin;
+
+ @Parameter(names = "--auth-parameters", description = "authentication
parameters", required = false)
+ protected String authenticationParameters;
+
+ @Parameter(names = "--proxy-protocol", description = "protocol to
decide type of proxy routing eg: SNI", required = false)
+ protected ProxyProtocol proxyProtocol;
+
+ @Parameter(names = "--tls-enable", description = "Enable tls
connection", required = false)
+ protected Boolean brokerClientTlsEnabled;
+
+ @Parameter(names = "--tls-allow-insecure", description = "Allow
insecure tls connection", required = false)
+ protected Boolean tlsAllowInsecureConnection;
+
+ @Parameter(names = "--tls-enable-keystore", description = "Whether use
KeyStore type to authenticate", required = false)
+ protected Boolean brokerClientTlsEnabledWithKeyStore;
+
+ @Parameter(names = "--tls-trust-store-type", description = "TLS
TrustStore type configuration for internal client eg: JKS", required = false)
+ protected String brokerClientTlsTrustStoreType;
+
+ @Parameter(names = "--tls-trust-store", description = "TLS TrustStore
path for internal client", required = false)
+ protected String brokerClientTlsTrustStore;
+
+ @Parameter(names = "--tls-trust-store-pwd", description = "TLS
TrustStore password for internal client", required = false)
+ protected String brokerClientTlsTrustStorePassword;
+
+ @Parameter(names = "--tls-trust-certs-filepath", description = "path
for the trusted TLS certificate file", required = false)
+ protected String brokerClientTrustCertsFilePath;
+
+ @Parameter(names = "--cluster-config-file", description = "The path to
a YAML config file specifying the "
+ + "cluster's configuration")
+ protected String clusterConfigFile;
+
+ protected ClusterData clusterData;
+
+ @Override
+ void processArguments() throws Exception {
+ super.processArguments();
+
+ if (null != clusterConfigFile) {
+ this.clusterData = CmdUtils.loadConfig(clusterConfigFile,
ClusterData.class);
+ } else {
+ this.clusterData = new ClusterData();
+ }
+
+ if (serviceUrl != null) {
+ clusterData.setServiceUrl(serviceUrl);
+ }
+ if (serviceUrlTls != null) {
+ clusterData.setServiceUrlTls(serviceUrlTls);
+ }
+ if (brokerServiceUrl != null) {
+ clusterData.setBrokerServiceUrl(brokerServiceUrl);
+ }
+ if (brokerServiceUrlTls != null) {
+ clusterData.setBrokerServiceUrlTls(brokerServiceUrlTls);
+ }
+ if (proxyServiceUrl != null) {
+ clusterData.setProxyServiceUrl(proxyServiceUrl);
+ }
+ if (authenticationPlugin != null) {
+ clusterData.setAuthenticationPlugin(authenticationPlugin);
+ }
+ if (authenticationParameters != null) {
+
clusterData.setAuthenticationParameters(authenticationParameters);
+ }
+ if (proxyProtocol != null) {
+ clusterData.setProxyProtocol(proxyProtocol);
+ }
+ if (brokerClientTlsEnabled != null) {
+ clusterData.setBrokerClientTlsEnabled(brokerClientTlsEnabled);
+ }
+ if (tlsAllowInsecureConnection != null) {
+
clusterData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+ }
+ if (brokerClientTlsEnabledWithKeyStore != null) {
+
clusterData.setBrokerClientTlsEnabledWithKeyStore(brokerClientTlsEnabledWithKeyStore);
+ }
+ if (brokerClientTlsTrustStoreType != null) {
+
clusterData.setBrokerClientTlsTrustStoreType(brokerClientTlsTrustStoreType);
+ }
+ if (brokerClientTlsTrustStore != null) {
+
clusterData.setBrokerClientTlsTrustStore(brokerClientTlsTrustStore);
+ }
+ if (brokerClientTlsTrustStorePassword != null) {
+
clusterData.setBrokerClientTlsTrustStorePassword(brokerClientTlsTrustStorePassword);
+ }
+ if (brokerClientTrustCertsFilePath != null) {
+
clusterData.setBrokerClientTrustCertsFilePath(brokerClientTrustCertsFilePath);
+ }
+ validateClusterData(clusterData);
+ }
+ }
public CmdClusters(Supplier<PulsarAdmin> admin) {
super("clusters", admin);
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
new file mode 100644
index 0000000..7cf17cf
--- /dev/null
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import org.apache.pulsar.client.api.ProxyProtocol;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+import java.io.File;
+import java.nio.file.Files;
+import org.apache.pulsar.admin.cli.utils.CmdUtils;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.client.admin.Clusters;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCmdClusters {
+
+ private PulsarAdmin pulsarAdmin;
+
+ private CmdClusters cmdClusters;
+
+ private Clusters clusters;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ pulsarAdmin = mock(PulsarAdmin.class);
+ clusters = mock(Clusters.class);
+ when(pulsarAdmin.clusters()).thenReturn(clusters);
+
+ cmdClusters = spy(new CmdClusters(() -> pulsarAdmin));
+ }
+
+ @Test
+ public void testCmdClusterConfigFile() throws Exception {
+ ClusterData clusterData = buildClusterData();
+ testCmdClusterConfigFile(clusterData, clusterData);
+ }
+
+ public void testCmdClusterConfigFile(ClusterData testClusterData,
ClusterData expectedClusterData) throws Exception {
+ File file = Files.createTempFile("tmp_cluster", ".yaml").toFile();
+ new YAMLMapper().writeValue(file, testClusterData);
+ Assert.assertEquals(testClusterData,
CmdUtils.loadConfig(file.getAbsolutePath(), ClusterData.class));
+
+ // test create cluster
+ cmdClusters.run(new String[]{"create", "test_cluster",
"--cluster-config-file", file.getAbsolutePath()});
+ verify(clusters).createCluster(eq("test_cluster"),
eq(expectedClusterData));
+
+ cmdClusters.run(new String[]{"update", "test_cluster",
"--cluster-config-file", file.getAbsolutePath()});
+ verify(clusters).updateCluster(eq("test_cluster"),
eq(expectedClusterData));
+ }
+
+ public ClusterData buildClusterData() {
+ return ClusterData.builder()
+ .serviceUrlTls("https://my-service.url:4443")
+ .authenticationPlugin("authenticationPlugin")
+ .authenticationParameters("authenticationParameters")
+ .proxyProtocol(ProxyProtocol.SNI)
+ .brokerClientTlsEnabled(true)
+ .brokerClientTlsEnabledWithKeyStore(true)
+ .brokerClientTlsTrustStoreType("JKS")
+
.brokerClientTlsTrustStore("/var/private/tls/client.truststore.jks")
+ .brokerClientTlsTrustStorePassword("clientpw")
+ .build();
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index 7d5f497..25551fb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -24,6 +24,12 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.LinkedHashSet;
import java.util.Objects;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.experimental.Accessors;
import org.apache.pulsar.client.api.ProxyProtocol;
/**
@@ -33,6 +39,12 @@ import org.apache.pulsar.client.api.ProxyProtocol;
value = "ClusterData",
description = "The configuration data for a cluster"
)
+@Getter
+@Setter
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Accessors(chain = true)
public class ClusterData {
@ApiModelProperty(
name = "serviceUrl",
@@ -66,14 +78,14 @@ public class ClusterData {
)
private String proxyServiceUrl;
@ApiModelProperty(
- name = "authenticationPlugin",
- value = "Authentication plugin when client would like to connect
to cluster.",
- example = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
+ name = "authenticationPlugin",
+ value = "Authentication plugin when client would like to connect to
cluster.",
+ example = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
)
private String authenticationPlugin;
@ApiModelProperty(
- name = "authenticationParameters",
- value = "Authentication parameters when client would like to
connect to cluster."
+ name = "authenticationParameters",
+ value = "Authentication parameters when client would like to connect
to cluster."
)
private String authenticationParameters;
@ApiModelProperty(
@@ -90,9 +102,48 @@ public class ClusterData {
value = "A set of peer cluster names"
)
private LinkedHashSet<String> peerClusterNames;
-
- public ClusterData() {
- }
+ @ApiModelProperty(
+ name = "brokerClientTlsEnabled",
+ value = "Enable TLS when talking with other brokers in the same
cluster (admin operation)"
+ + " or different clusters (replication)"
+ )
+ private boolean brokerClientTlsEnabled = false;
+ @ApiModelProperty(
+ name = "tlsAllowInsecureConnection",
+ value = "Allow TLS connections to servers whose certificate cannot be"
+ + " be verified to have been signed by a trusted certificate"
+ + " authority."
+ )
+ private boolean tlsAllowInsecureConnection = false;
+ @ApiModelProperty(
+ name = "brokerClientTlsEnabledWithKeyStore",
+ value = "Whether internal client use KeyStore type to authenticate
with other Pulsar brokers"
+ )
+ private boolean brokerClientTlsEnabledWithKeyStore = false;
+ @ApiModelProperty(
+ name = "brokerClientTlsTrustStoreType",
+ value = "TLS TrustStore type configuration for internal client: JKS,
PKCS12"
+ + " used by the internal client to authenticate with Pulsar
brokers",
+ example = "JKS"
+ )
+ private String brokerClientTlsTrustStoreType = "JKS";
+ @ApiModelProperty(
+ name = "brokerClientTlsTrustStore",
+ value = "TLS TrustStore path for internal client"
+ + " used by the internal client to authenticate with Pulsar
brokers"
+ )
+ private String brokerClientTlsTrustStore;
+ @ApiModelProperty(
+ name = "brokerClientTlsTrustStorePassword",
+ value = "TLS TrustStore password for internal client"
+ + " used by the internal client to authenticate with Pulsar
brokers"
+ )
+ private String brokerClientTlsTrustStorePassword;
+ @ApiModelProperty(
+ name = "brokerClientTrustCertsFilePath",
+ value = "Path for the trusted TLS certificate file for outgoing
connection to a server (broker)"
+ )
+ private String brokerClientTrustCertsFilePath;
public ClusterData(String serviceUrl) {
this(serviceUrl, "");
@@ -144,78 +195,13 @@ public class ClusterData {
this.proxyProtocol = other.proxyProtocol;
this.authenticationPlugin = other.authenticationPlugin;
this.authenticationParameters = other.authenticationParameters;
- }
-
- public String getServiceUrl() {
- return serviceUrl;
- }
-
- public String getServiceUrlTls() {
- return serviceUrlTls;
- }
-
- public void setServiceUrl(String serviceUrl) {
- this.serviceUrl = serviceUrl;
- }
-
- public void setServiceUrlTls(String serviceUrlTls) {
- this.serviceUrlTls = serviceUrlTls;
- }
-
- public String getBrokerServiceUrl() {
- return brokerServiceUrl;
- }
-
- public void setBrokerServiceUrl(String brokerServiceUrl) {
- this.brokerServiceUrl = brokerServiceUrl;
- }
-
- public String getBrokerServiceUrlTls() {
- return brokerServiceUrlTls;
- }
-
- public void setBrokerServiceUrlTls(String brokerServiceUrlTls) {
- this.brokerServiceUrlTls = brokerServiceUrlTls;
- }
-
- public String getProxyServiceUrl() {
- return proxyServiceUrl;
- }
-
- public void setProxyServiceUrl(String proxyServiceUrl) {
- this.proxyServiceUrl = proxyServiceUrl;
- }
-
- public ProxyProtocol getProxyProtocol() {
- return proxyProtocol;
- }
-
- public void setProxyProtocol(ProxyProtocol proxyProtocol) {
- this.proxyProtocol = proxyProtocol;
- }
-
- public LinkedHashSet<String> getPeerClusterNames() {
- return peerClusterNames;
- }
-
- public String getAuthenticationPlugin() {
- return authenticationPlugin;
- }
-
- public void setAuthenticationPlugin(String authenticationPlugin) {
- this.authenticationPlugin = authenticationPlugin;
- }
-
- public String getAuthenticationParameters() {
- return authenticationParameters;
- }
-
- public void setAuthenticationParameters(String authenticationParameters) {
- this.authenticationParameters = authenticationParameters;
- }
-
- public void setPeerClusterNames(LinkedHashSet<String> peerClusterNames) {
- this.peerClusterNames = peerClusterNames;
+ this.brokerClientTlsEnabled = other.brokerClientTlsEnabled;
+ this.tlsAllowInsecureConnection = other.tlsAllowInsecureConnection;
+ this.brokerClientTlsEnabledWithKeyStore =
other.brokerClientTlsEnabledWithKeyStore;
+ this.brokerClientTlsTrustStoreType =
other.brokerClientTlsTrustStoreType;
+ this.brokerClientTlsTrustStore = other.brokerClientTlsTrustStore;
+ this.brokerClientTlsTrustStorePassword =
other.brokerClientTlsTrustStorePassword;
+ this.brokerClientTrustCertsFilePath =
other.brokerClientTrustCertsFilePath;
}
@Override
@@ -228,8 +214,14 @@ public class ClusterData {
&& Objects.equals(proxyServiceUrl, other.proxyServiceUrl)
&& Objects.equals(proxyProtocol, other.proxyProtocol)
&& Objects.equals(authenticationPlugin,
other.authenticationPlugin)
- && Objects.equals(authenticationParameters,
other.authenticationParameters);
-
+ && Objects.equals(authenticationParameters,
other.authenticationParameters)
+ && Objects.equals(brokerClientTlsEnabled,
other.brokerClientTlsEnabled)
+ && Objects.equals(tlsAllowInsecureConnection,
other.tlsAllowInsecureConnection)
+ && Objects.equals(brokerClientTlsEnabledWithKeyStore,
other.brokerClientTlsEnabledWithKeyStore)
+ && Objects.equals(brokerClientTlsTrustStoreType,
other.brokerClientTlsTrustStoreType)
+ && Objects.equals(brokerClientTlsTrustStore,
other.brokerClientTlsTrustStore)
+ && Objects.equals(brokerClientTlsTrustStorePassword,
other.brokerClientTlsTrustStorePassword)
+ && Objects.equals(brokerClientTrustCertsFilePath,
other.brokerClientTrustCertsFilePath);
}
return false;
@@ -237,7 +229,7 @@ public class ClusterData {
@Override
public int hashCode() {
- return Objects.hash(this.toString());
+ return Objects.hash(this.toString());
}
@Override
@@ -249,8 +241,17 @@ public class ClusterData {
.add("brokerServiceUrlTls", brokerServiceUrlTls)
.add("proxyServiceUrl", proxyServiceUrl)
.add("proxyProtocol", proxyProtocol)
- .add("peerClusterNames",
peerClusterNames).add("authenticationPlugin", authenticationPlugin)
- .add("authenticationParameters",
authenticationParameters).toString();
+ .add("peerClusterNames", peerClusterNames)
+ .add("authenticationPlugin", authenticationPlugin)
+ .add("authenticationParameters", authenticationParameters)
+ .add("brokerClientTlsEnabled", brokerClientTlsEnabled)
+ .add("tlsAllowInsecureConnection", tlsAllowInsecureConnection)
+ .add("brokerClientTlsEnabledWithKeyStore",
brokerClientTlsEnabledWithKeyStore)
+ .add("brokerClientTlsTrustStoreType",
brokerClientTlsTrustStoreType)
+ .add("brokerClientTlsTrustStore", brokerClientTlsTrustStore)
+ .add("brokerClientTlsTrustStorePassword",
brokerClientTlsTrustStorePassword)
+ .add("brokerClientTrustCertsFilePath",
brokerClientTrustCertsFilePath)
+ .toString();
}
}