This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a01cf311269 [improve][broker] Split TLS transport encryption support 
from authentication (#16819)
a01cf311269 is described below

commit a01cf311269d195612dc1a40b454918a660c0347
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Aug 2 10:47:19 2022 +0800

    [improve][broker] Split TLS transport encryption support from 
authentication (#16819)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 conf/broker.conf                                   | 29 ++++++-
 conf/standalone.conf                               | 27 +++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 90 ++++++++++++++--------
 .../org/apache/pulsar/broker/PulsarService.java    | 17 +++-
 .../pulsar/broker/service/BrokerService.java       |  9 ++-
 .../client/impl/AdminApiKeyStoreTlsAuthTest.java   | 58 ++++++++++++--
 .../pulsar/client/admin/PulsarAdminBuilder.java    | 40 ++++++++++
 .../admin/internal/PulsarAdminBuilderImpl.java     | 30 ++++++++
 .../admin/internal/http/AsyncHttpConnector.java    | 12 ++-
 9 files changed, 264 insertions(+), 48 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 5216034fdb8..40a5c49d1f5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -681,12 +681,39 @@ tlsTrustStore=
 # TLS TrustStore password in broker, default value is empty password
 tlsTrustStorePassword=
 
+# Whether internal client uses TLS to connection the broker
+brokerClientTlsEnabled=false
+
 # Whether internal client use KeyStore type to authenticate with Pulsar brokers
 brokerClientTlsEnabledWithKeyStore=false
 
 # The TLS Provider used by internal client to authenticate with other Pulsar 
brokers
 brokerClientSslProvider=
 
+# TLS trusted certificate file for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTrustCertsFilePath=
+
+# TLS private key file for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientKeyFilePath=
+
+# TLS certificate file for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientCertificateFilePath=
+
+# TLS KeyStore type configuration for internal client: JKS, PKCS12
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsKeyStoreType=JKS
+
+# TLS KeyStore path for internal client
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsKeyStore=
+
+# TLS KeyStore password for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsKeyStorePassword=
+
 # TLS TrustStore type configuration for internal client: JKS, PKCS12
 # used by the internal client to authenticate with Pulsar brokers
 brokerClientTlsTrustStoreType=JKS
@@ -758,10 +785,8 @@ superUserRoles=
 
 # Authentication settings of the broker itself. Used when the broker connects 
to other brokers,
 # either in same or other clusters
-brokerClientTlsEnabled=false
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
-brokerClientTrustCertsFilePath=
 
 # Supported Athenz provider domain names(comma separated) for authentication
 athenzDomainNames=
diff --git a/conf/standalone.conf b/conf/standalone.conf
index b3281c02737..a5450e8e19c 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -413,12 +413,39 @@ tlsTrustStore=
 # TLS TrustStore password for broker
 tlsTrustStorePassword=
 
+# Whether internal client uses TLS to connection the broker
+brokerClientTlsEnabled=false
+
 # Whether internal client use KeyStore type to authenticate with Pulsar brokers
 brokerClientTlsEnabledWithKeyStore=false
 
 # The TLS Provider used by internal client to authenticate with other Pulsar 
brokers
 brokerClientSslProvider=
 
+# TLS trusted certificate file for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTrustCertsFilePath=
+
+# TLS private key file for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientKeyFilePath=
+
+# TLS certificate file for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientCertificateFilePath=
+
+# TLS KeyStore type configuration for internal client: JKS, PKCS12
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsKeyStoreType=JKS
+
+# TLS KeyStore path for internal client
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsKeyStore=
+
+# TLS KeyStore password for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsKeyStorePassword=
+
 # TLS TrustStore type configuration for internal client: JKS, PKCS12
 # used by the internal client to authenticate with Pulsar brokers
 brokerClientTlsTrustStoreType=JKS
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 93c79e5398e..74c55cb7d3a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1474,24 +1474,6 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + " or last position eg: *.pulsar.service, pulsar.service.*)")
     private boolean authorizationAllowWildcardsMatching = false;
 
-    @FieldContext(
-        category = CATEGORY_AUTHENTICATION,
-        dynamic = true,
-        doc = "Authentication settings of the broker itself. \n\nUsed when the 
broker connects"
-            + " to other brokers, either in same or other clusters. Default 
uses plugin which disables authentication"
-    )
-    private String brokerClientAuthenticationPlugin = 
"org.apache.pulsar.client.impl.auth.AuthenticationDisabled";
-    @FieldContext(
-        category = CATEGORY_AUTHENTICATION,
-        dynamic = true,
-        doc = "Authentication parameters of the authentication plugin the 
broker is using to connect to other brokers"
-    )
-    private String brokerClientAuthenticationParameters = "";
-    @FieldContext(
-        category = CATEGORY_AUTHENTICATION,
-        doc = "Path for the trusted TLS certificate file for outgoing 
connection to a server (broker)")
-    private String brokerClientTrustCertsFilePath = "";
-
     @FieldContext(
         category = CATEGORY_AUTHORIZATION,
         doc = "When this parameter is not empty, unauthenticated users perform 
as anonymousUserRole"
@@ -2394,14 +2376,6 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "@deprecated - Use brokerClientTlsEnabled instead."
     )
     private boolean replicationTlsEnabled = false;
-    @FieldContext(
-        category = CATEGORY_REPLICATION,
-        dynamic = true,
-        doc = "Enable TLS when talking with other brokers in the same cluster 
(admin operation)"
-            + " or different clusters (replication)"
-    )
-    private boolean brokerClientTlsEnabled = false;
-
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Default message retention time"
@@ -2885,7 +2859,29 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @ToString.Exclude
     private String tlsTrustStorePassword = null;
 
-    /**** --- KeyStore TLS config variables used for internal client/admin to 
auth with other broker. --- ****/
+    /**** --- Config variables used for internal client/admin to auth with 
other broker. --- ****/
+    @FieldContext(
+            category = CATEGORY_AUTHENTICATION,
+            dynamic = true,
+            doc = "Authentication settings of the broker itself. \n\nUsed when 
the broker connects"
+                    + " to other brokers, either in same or other clusters. "
+                    + "Default uses plugin which disables authentication"
+    )
+    private String brokerClientAuthenticationPlugin = 
"org.apache.pulsar.client.impl.auth.AuthenticationDisabled";
+    @FieldContext(
+            category = CATEGORY_AUTHENTICATION,
+            dynamic = true,
+            doc = "Authentication parameters of the authentication plugin the 
broker is using to connect "
+                    + "to other brokers"
+    )
+    private String brokerClientAuthenticationParameters = "";
+    @FieldContext(
+            category = CATEGORY_REPLICATION,
+            dynamic = true,
+            doc = "Enable TLS when talking with other brokers in the same 
cluster (admin operation) "
+                    + "or different clusters (replication)"
+    )
+    private boolean brokerClientTlsEnabled = false;
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
             doc = "Whether internal client use KeyStore type to authenticate 
with other Pulsar brokers"
@@ -2896,17 +2892,32 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             doc = "The TLS Provider used by internal client to authenticate 
with other Pulsar brokers"
     )
     private String brokerClientSslProvider = null;
-    // needed when client auth is required
+    @FieldContext(
+            category = CATEGORY_AUTHENTICATION,
+            doc = "TLS trusted certificate file for internal client, "
+                    + "used by the internal client to authenticate with Pulsar 
brokers")
+    private String brokerClientTrustCertsFilePath = "";
+    @FieldContext(
+            category = CATEGORY_AUTHENTICATION,
+            doc = "TLS private key file for internal client, "
+                    + "used by the internal client to authenticate with Pulsar 
brokers")
+    private String brokerClientKeyFilePath = "";
+    @FieldContext(
+            category = CATEGORY_AUTHENTICATION,
+            doc = "TLS certificate file for internal client, "
+                    + "used by the internal client to authenticate with Pulsar 
brokers"
+    )
+    private String brokerClientCertificateFilePath = "";
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
             doc = "TLS TrustStore type configuration for internal client: JKS, 
PKCS12 "
-                  + " used by the internal client to authenticate with Pulsar 
brokers"
+                    + " used by the internal client to authenticate with 
Pulsar brokers"
     )
     private String brokerClientTlsTrustStoreType = "JKS";
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
             doc = "TLS TrustStore path for internal client, "
-                  + " used by the internal client to authenticate with Pulsar 
brokers"
+                    + " used by the internal client to authenticate with 
Pulsar brokers"
     )
     private String brokerClientTlsTrustStore = null;
     @FieldContext(
@@ -2916,6 +2927,25 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     @ToString.Exclude
     private String brokerClientTlsTrustStorePassword = null;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore type configuration for internal client: JKS, 
PKCS12,"
+                  + " used by the internal client to authenticate with Pulsar 
brokers"
+    )
+    private String brokerClientTlsKeyStoreType = "JKS";
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore path for internal client, "
+                  + " used by the internal client to authenticate with Pulsar 
brokers"
+    )
+    private String brokerClientTlsKeyStore = null;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore password for internal client, "
+                  + " used by the internal client to authenticate with Pulsar 
brokers"
+    )
+    @ToString.Exclude
+    private String brokerClientTlsKeyStorePassword = null;
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
             doc = "Specify the tls cipher the internal client will use to 
negotiate during TLS Handshake"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f0d9e68f380..b5baa717093 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1484,11 +1484,16 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
                         
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
                         
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
+                        
conf.setTlsKeyStoreType(this.getConfiguration().getBrokerClientTlsKeyStoreType());
+                        
conf.setTlsKeyStorePath(this.getConfiguration().getBrokerClientTlsKeyStore());
+                        
conf.setTlsKeyStorePassword(this.getConfiguration().getBrokerClientTlsKeyStorePassword());
                     } else {
                         conf.setTlsTrustCertsFilePath(
                                 
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
                                         ? 
this.getConfiguration().getBrokerClientTrustCertsFilePath()
                                         : 
this.getConfiguration().getTlsCertificateFilePath());
+                        
conf.setTlsKeyFilePath(this.getConfiguration().getBrokerClientKeyFilePath());
+                        
conf.setTlsCertificateFilePath(this.getConfiguration().getBrokerClientCertificateFilePath());
                     }
                 }
 
@@ -1534,12 +1539,16 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                     builder.tlsCiphers(config.getBrokerClientTlsCiphers())
                             
.tlsProtocols(config.getBrokerClientTlsProtocols());
                     if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
-                        builder.useKeyStoreTls(true)
-                                
.tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
+                        
builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
                                 
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
-                                
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
+                                
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
+                                
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
+                                
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
+                                
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
                     } else {
-                        
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+                        
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
+                                
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
+                                
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
                     }
                     
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7f1ad0acf61..0a2e99004a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1291,9 +1291,14 @@ public class BrokerService implements Closeable {
                         builder.useKeyStoreTls(true)
                                 
.tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
                                 
.tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
-                                
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
+                                
.tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword())
+                                
.tlsKeyStoreType(conf.getBrokerClientTlsKeyStoreType())
+                                
.tlsKeyStorePath(conf.getBrokerClientTlsKeyStore())
+                                
.tlsKeyStorePassword(conf.getBrokerClientTlsKeyStorePassword());
                     } else {
-                        
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+                        
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath())
+                                
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
+                                
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
                     }
                 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
index b030ffe0a1b..22400e0ae64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
@@ -20,35 +20,38 @@ package org.apache.pulsar.client.impl;
 
 import static 
org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls.mapToString;
 import static org.testng.AssertJUnit.fail;
-
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-
+import io.jsonwebtoken.SignatureAlgorithm;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
-
+import javax.crypto.SecretKey;
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
-
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.tls.NoopHostnameVerifier;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.tls.NoopHostnameVerifier;
 import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
@@ -57,6 +60,7 @@ import org.glassfish.jersey.media.multipart.MultiPartFeature;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -64,6 +68,8 @@ import org.testng.annotations.Test;
 public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
     private final String clusterName = "test";
     Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+    private final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private final String CLIENTUSER_TOKEN = 
AuthTokenUtils.createToken(SECRET_KEY, "clientuser", Optional.empty());
 
     @BeforeMethod
     @Override
@@ -93,8 +99,13 @@ public class AdminApiKeyStoreTlsAuthTest extends 
ProducerConsumerBase {
         conf.setAuthorizationEnabled(true);
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
+        providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
 
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
         conf.setBrokerClientTlsEnabled(true);
         conf.setBrokerClientTlsEnabledWithKeyStore(true);
 
@@ -216,4 +227,39 @@ public class AdminApiKeyStoreTlsAuthTest extends 
ProducerConsumerBase {
             fail("Should not have thrown an exception");
         }
     }
+
+    private final Authentication tlsAuth =
+            new AuthenticationKeyStoreTls(KEYSTORE_TYPE, 
CLIENT_KEYSTORE_FILE_PATH, CLIENT_KEYSTORE_PW);
+    private final Authentication tokenAuth = new 
AuthenticationToken(CLIENTUSER_TOKEN);
+
+    @DataProvider
+    public Object[] keyStoreTlsTransportWithAuth() {
+        return new Object[]{
+                // Verify JKS TLS transport encryption with TLS authentication
+                tlsAuth,
+                null,
+                // Verify JKS TLS transport encryption with token 
authentication
+                tokenAuth,
+        };
+    }
+
+    @Test(dataProvider = "keyStoreTlsTransportWithAuth")
+    public void testKeyStoreTlsTransportWithAuth(Authentication auth) throws 
Exception {
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrlTls.toString())
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .tlsKeyStorePath(CLIENT_KEYSTORE_FILE_PATH)
+                .tlsKeyStorePassword(CLIENT_KEYSTORE_PW)
+                .authentication(auth)
+                .allowTlsInsecureConnection(false)
+                .build();
+
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+        admin.tenants().createTenant("tenant1",
+                new TenantInfoImpl(ImmutableSet.of("foobar"),
+                        ImmutableSet.of("test")));
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index c685c1f7793..60d4c2dbc71 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -168,6 +168,22 @@ public interface PulsarAdminBuilder {
      */
     PulsarAdminBuilder authentication(Authentication authentication);
 
+    /**
+     * Set the path to the TLS key file.
+     *
+     * @param tlsKeyFilePath
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder tlsKeyFilePath(String tlsKeyFilePath);
+
+    /**
+     * Set the path to the TLS certificate file.
+     *
+     * @param tlsCertificateFilePath
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder tlsCertificateFilePath(String tlsCertificateFilePath);
+
     /**
      * Set the path to the trusted TLS certificate file.
      *
@@ -209,6 +225,30 @@ public interface PulsarAdminBuilder {
      */
     PulsarAdminBuilder sslProvider(String sslProvider);
 
+    /**
+     * The file format of the key store file.
+     *
+     * @param tlsKeyStoreType
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder tlsKeyStoreType(String tlsKeyStoreType);
+
+    /**
+     * The location of the key store file.
+     *
+     * @param tlsTrustStorePath
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder tlsKeyStorePath(String tlsTrustStorePath);
+
+    /**
+     * The store password for the key store file.
+     *
+     * @param tlsKeyStorePassword
+     * @return the admin builder instance
+     */
+    PulsarAdminBuilder tlsKeyStorePassword(String tlsKeyStorePassword);
+
     /**
      * The file format of the trust store file.
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index d86b9e73457..7e7922be6ca 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -95,6 +95,18 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
         return this;
     }
 
+    @Override
+    public PulsarAdminBuilder tlsKeyFilePath(String tlsKeyFilePath) {
+        conf.setTlsKeyFilePath(tlsKeyFilePath);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsCertificateFilePath(String 
tlsCertificateFilePath) {
+        conf.setTlsCertificateFilePath(tlsCertificateFilePath);
+        return this;
+    }
+
     @Override
     public PulsarAdminBuilder tlsTrustCertsFilePath(String 
tlsTrustCertsFilePath) {
         conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
@@ -125,6 +137,24 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
         return this;
     }
 
+    @Override
+    public PulsarAdminBuilder tlsKeyStoreType(String tlsKeyStoreType) {
+        conf.setTlsKeyStoreType(tlsKeyStoreType);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsKeyStorePath(String tlsTrustStorePath) {
+        conf.setTlsKeyStorePath(tlsTrustStorePath);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsKeyStorePassword(String tlsKeyStorePassword) {
+        conf.setTlsKeyStorePassword(tlsKeyStorePassword);
+        return this;
+    }
+
     @Override
     public PulsarAdminBuilder tlsTrustStoreType(String tlsTrustStoreType) {
         conf.setTlsTrustStoreType(tlsTrustStoreType);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 2b08bfc0048..4595d6fd54d 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -122,13 +122,15 @@ public class AsyncHttpConnector implements Connector {
                 AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData();
 
                 if (conf.isUseKeyStoreTls()) {
-                    KeyStoreParams params = authData.hasDataForTls() ? 
authData.getTlsKeyStoreParams() : null;
+                    KeyStoreParams params = authData.hasDataForTls() ? 
authData.getTlsKeyStoreParams() :
+                            new KeyStoreParams(conf.getTlsKeyStoreType(), 
conf.getTlsKeyStorePath(),
+                                    conf.getTlsKeyStorePassword());
 
                     final SSLContext sslCtx = 
KeyStoreSSLContext.createClientSslContext(
                             conf.getSslProvider(),
-                            params != null ? params.getKeyStoreType() : null,
-                            params != null ? params.getKeyStorePath() : null,
-                            params != null ? params.getKeyStorePassword() : 
null,
+                            params.getKeyStoreType(),
+                            params.getKeyStorePath(),
+                            params.getKeyStorePassword(),
                             conf.isTlsAllowInsecureConnection(),
                             conf.getTlsTrustStoreType(),
                             conf.getTlsTrustStorePath(),
@@ -163,6 +165,8 @@ public class AsyncHttpConnector implements Connector {
                                 sslProvider,
                                 conf.isTlsAllowInsecureConnection(),
                                 conf.getTlsTrustCertsFilePath(),
+                                conf.getTlsCertificateFilePath(),
+                                conf.getTlsKeyFilePath(),
                                 conf.getTlsCiphers(),
                                 conf.getTlsProtocols());
                     }

Reply via email to