This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66a1b1b  Issue 3655: Kerberos authentication for proxy (#3997)
66a1b1b is described below

commit 66a1b1b61a23fecbfbc158556434e04bd8f1c7fc
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Apr 9 22:56:38 2019 +0800

    Issue 3655: Kerberos authentication for proxy (#3997)
    
    Fixes #3655
    
    Master Issue: #3491
    
    ### Motivation
    add support of Kerberos authentication for proxy
    
    ### Modifications
    add support of Kerberos authentication for proxy ;
    add unit test.
    
    ### Verifying this change
    Ut passed
---
 pulsar-broker-auth-sasl/pom.xml                    |   7 +
 .../authentication/AuthenticationProviderSasl.java |   7 +-
 .../ProxySaslAuthenticationTest.java               | 289 +++++++++++++++++++++
 .../authentication/SaslAuthenticateTest.java       |   2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   4 +-
 .../client/impl/auth/AuthenticationSasl.java       |   8 +-
 .../pulsar/client/impl/auth/PulsarSaslClient.java  |   7 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   4 +-
 .../org/apache/pulsar/common/api/Commands.java     |   5 +-
 .../apache/pulsar/common/sasl/SaslConstants.java   |  13 +-
 .../pulsar/proxy/server/DirectProxyHandler.java    |  47 +++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  25 +-
 .../pulsar/proxy/server/ProxyConfiguration.java    |  26 +-
 .../pulsar/proxy/server/ProxyConnection.java       | 232 +++++++++++------
 14 files changed, 558 insertions(+), 118 deletions(-)

diff --git a/pulsar-broker-auth-sasl/pom.xml b/pulsar-broker-auth-sasl/pom.xml
index cc19661..77aaf1e 100644
--- a/pulsar-broker-auth-sasl/pom.xml
+++ b/pulsar-broker-auth-sasl/pom.xml
@@ -77,6 +77,13 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-proxy</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
       <artifactId>managed-ledger-original</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>
diff --git 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index d11a0e1..a43cdb0 100644
--- 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.authentication;
 
-import static 
org.apache.pulsar.common.sasl.SaslConstants.JAAS_BROKER_SECTION_NAME;
+import static 
org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
 import static 
org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
 import static org.apache.pulsar.common.sasl.SaslConstants.KINIT_COMMAND;
 
@@ -53,7 +53,7 @@ public class AuthenticationProviderSasl implements 
AuthenticationProvider {
         this.configuration = Maps.newHashMap();
         final String allowedIdsPatternRegExp = 
config.getSaslJaasClientAllowedIds();
         configuration.put(JAAS_CLIENT_ALLOWED_IDS, allowedIdsPatternRegExp);
-        configuration.put(JAAS_BROKER_SECTION_NAME, 
config.getSaslJaasBrokerSectionName());
+        configuration.put(JAAS_SERVER_SECTION_NAME, 
config.getSaslJaasServerSectionName());
         configuration.put(KINIT_COMMAND, config.getKinitCommand());
 
         try {
@@ -63,7 +63,7 @@ public class AuthenticationProviderSasl implements 
AuthenticationProvider {
             throw new IOException(error);
         }
 
-        loginContextName = config.getSaslJaasBrokerSectionName();
+        loginContextName = config.getSaslJaasServerSectionName();
         if (jaasCredentialsContainer == null) {
             log.info("JAAS loginContext is: {}." , loginContextName);
             try {
@@ -101,7 +101,6 @@ public class AuthenticationProviderSasl implements 
AuthenticationProvider {
                                             SocketAddress remoteAddress,
                                             SSLSession sslSession) throws 
AuthenticationException {
         try {
-            new PulsarSaslServer(jaasCredentialsContainer.getSubject(), 
allowedIdsPattern);
             return new SaslAuthenticationState(
                 new SaslAuthenticationDataSource(
                     new 
PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern)));
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
new file mode 100644
index 0000000..9ab711f
--- /dev/null
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.auth.login.Configuration;
+
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+import org.apache.pulsar.proxy.server.ProxyService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
+       private static final Logger log = 
LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
+    private int webServicePort;
+    private int servicePort;
+
+       public static File kdcDir;
+       public static File kerberosWorkDir;
+
+       private static MiniKdc kdc;
+       private static Properties properties;
+
+       private static String localHostname = "localhost";
+
+       @BeforeClass
+       public static void startMiniKdc() throws Exception {
+               kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
+               kerberosWorkDir = 
Files.createTempDirectory("test-kerberos-work-dir").toFile();
+
+               properties = MiniKdc.createConf();
+               kdc = new MiniKdc(properties, kdcDir);
+               kdc.start();
+
+               String principalBrokerNoRealm = "broker/" + localHostname;
+               String principalBroker = "broker/" + localHostname + "@" + 
kdc.getRealm();
+               log.info("principalBroker: " + principalBroker);
+
+               String principalClientNoRealm = "client/" + localHostname;
+               String principalClient = principalClientNoRealm + "@" + 
kdc.getRealm();
+               log.info("principalClient: " + principalClient);
+
+               String principalProxyNoRealm = "proxy/" + localHostname;
+               String principalProxy = principalProxyNoRealm + "@" + 
kdc.getRealm();
+               log.info("principalProxy: " + principalProxy);
+
+               File keytabClient = new File(kerberosWorkDir, 
"pulsarclient.keytab");
+               kdc.createPrincipal(keytabClient, principalClientNoRealm);
+
+               File keytabBroker = new File(kerberosWorkDir, 
"pulsarbroker.keytab");
+               kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);
+
+               File keytabProxy = new File(kerberosWorkDir, 
"pulsarproxy.keytab");
+               kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
+
+               File jaasFile = new File(kerberosWorkDir, "jaas.properties");
+               try (FileWriter writer = new FileWriter(jaasFile)) {
+                       writer.write("\n"
+                               + "PulsarBroker {\n"
+                               + "  
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+                               + "  useKeyTab=true\n"
+                               + "  keyTab=\"" + 
keytabBroker.getAbsolutePath() + "\n"
+                               + "  storeKey=true\n"
+                               + "  useTicketCache=false\n" // won't test 
useTicketCache=true on JUnit tests
+                               + "  principal=\"" + principalBroker + "\";\n"
+                               + "};\n"
+                               + "\n"
+                               + "\n"
+                               + "\n"
+                               + "PulsarProxy{\n"
+                               + "  
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+                               + "  useKeyTab=true\n"
+                               + "  keyTab=\"" + keytabProxy.getAbsolutePath() 
+ "\n"
+                               + "  storeKey=true\n"
+                               + "  useTicketCache=false\n" // won't test 
useTicketCache=true on JUnit tests
+                               + "  principal=\"" + principalProxy + "\";\n"
+                               + "};\n"
+                               + "\n"
+                               + "\n"
+                               + "\n"
+                               + "PulsarClient {\n"
+                               + "  
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+                               + "  useKeyTab=true\n"
+                               + "  keyTab=\"" + 
keytabClient.getAbsolutePath() + "\n"
+                               + "  storeKey=true\n"
+                               + "  useTicketCache=false\n"
+                               + "  principal=\"" + principalClient + "\";\n"
+                               + "};\n"
+                       );
+               }
+
+               File krb5file = new File(kerberosWorkDir, "krb5.properties");
+               try (FileWriter writer = new FileWriter(krb5file)) {
+                       String conf = "[libdefaults]\n"
+                               + " default_realm = " + kdc.getRealm() + "\n"
+                               + " udp_preference_limit = 1\n" // force use TCP
+                               + "\n"
+                               + "\n"
+                               + "[realms]\n"
+                               + " " + kdc.getRealm() + "  = {\n"
+                               + "  kdc = " + kdc.getHost() + ":" + 
kdc.getPort() + "\n"
+                               + " }";
+                       writer.write(conf);
+                       log.info("krb5.properties:\n" + conf);
+               }
+
+               System.setProperty("java.security.auth.login.config", 
jaasFile.getAbsolutePath());
+               System.setProperty("java.security.krb5.properties", 
krb5file.getAbsolutePath());
+               Configuration.getConfiguration().refresh();
+
+               // Client config
+
+               log.info("created AuthenticationSasl");
+       }
+
+       @AfterClass
+       public static void stopMiniKdc() {
+               System.clearProperty("java.security.auth.login.config");
+               System.clearProperty("java.security.krb5.properties");
+               if (kdc != null) {
+                       kdc.stop();
+               }
+               FileUtils.deleteQuietly(kdcDir);
+               FileUtils.deleteQuietly(kerberosWorkDir);
+               Assert.assertFalse(kdcDir.exists());
+               Assert.assertFalse(kerberosWorkDir.exists());
+       }
+
+       @BeforeMethod
+       @Override
+       protected void setup() throws Exception {
+               log.info("-- {} --, start at host: {}", methodName, 
localHostname);
+               webServicePort = PortManager.nextFreePort();
+               servicePort = PortManager.nextFreePort();
+               isTcpLookup = true;
+               conf.setAdvertisedAddress(localHostname);
+               conf.setAuthenticationEnabled(true);
+               conf.setSaslAuthentication(true);
+               conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+               conf.setSaslJaasServerSectionName("PulsarBroker");
+               Set<String> providers = new HashSet<>();
+               providers.add(AuthenticationProviderSasl.class.getName());
+               conf.setAuthenticationProviders(providers);
+               conf.setClusterName("test");
+
+               super.init();
+
+               lookupUrl = new URI("broker://" + "localhost" + ":" + 
BROKER_PORT);
+
+               super.producerBaseSetup();
+               log.info("-- {} --, end.", methodName);
+       }
+
+       @Override
+       @AfterMethod
+       protected void cleanup() throws Exception {
+               super.internalCleanup();
+       }
+
+       @Test
+       void testAuthentication() throws Exception {
+               log.info("-- Starting {} test --", methodName);
+
+               // Step 1: Create Admin Client
+               //updateAdminClient();
+               final String proxyServiceUrl = "pulsar://localhost:" + 
servicePort;
+               // create a client which connects to proxy and pass authData
+               String topicName = "persistent://my-property/my-ns/my-topic1";
+
+               ProxyConfiguration proxyConfig = new ProxyConfiguration();
+               proxyConfig.setAuthenticationEnabled(true);
+               proxyConfig.setServicePort(servicePort);
+               proxyConfig.setWebServicePort(webServicePort);
+               proxyConfig.setBrokerServiceURL("pulsar://localhost:" + 
BROKER_PORT);
+               proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + 
".*");
+               proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
+
+               // proxy connect to broker
+               
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+               /*proxyConfig.setBrokerClientAuthenticationParameters(
+                       "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," 
+
+                               "\"serverType\": " + "\"broker\"}");*/
+               proxyConfig.setBrokerClientAuthenticationParameters(
+                       "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," 
+
+                               "\"serverType\": " + "\"broker\"}");
+
+               // proxy as a server, it will use sasl to authn
+               Set<String> providers = new HashSet<>();
+               providers.add(AuthenticationProviderSasl.class.getName());
+               proxyConfig.setAuthenticationProviders(providers);
+
+               proxyConfig.setForwardAuthorizationCredentials(true);
+               AuthenticationService authenticationService = new 
AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig));
+               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService);
+
+               proxyService.start();
+               log.info("1 proxy service started {}", proxyService);
+
+               // Step 3: Pass correct client params
+               PulsarClient proxyClient = createProxyClient(proxyServiceUrl, 
1);
+               log.info("2 create proxy client {}, {}", proxyServiceUrl, 
proxyClient);
+
+               Producer<byte[]> producer = 
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+               log.info("3 created producer.");
+
+               Consumer<byte[]> consumer = 
proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
+               log.info("4 created consumer.");
+
+               for (int i = 0; i < 10; i++) {
+                       String message = "my-message-" + i;
+                       producer.send(message.getBytes());
+                       log.info("Produced message: [{}]", message);
+               }
+
+               Message<byte[]> msg = null;
+               Set<String> messageSet = Sets.newHashSet();
+               for (int i = 0; i < 10; i++) {
+                       msg = consumer.receive(5, TimeUnit.SECONDS);
+                       String receivedMessage = new String(msg.getData());
+                       log.info("Received message: [{}]", receivedMessage);
+                       String expectedMessage = "my-message-" + i;
+                       testMessageOrderAndDuplicates(messageSet, 
receivedMessage, expectedMessage);
+               }
+               // Acknowledge the consumption of all messages at once
+               consumer.acknowledgeCumulative(msg);
+               consumer.close();
+
+               proxyClient.close();
+               proxyService.close();
+       }
+
+       private PulsarClient createProxyClient(String proxyServiceUrl, int 
numberOfConnections) throws PulsarClientException {
+               Map<String, String> clientSaslConfig = Maps.newHashMap();
+               clientSaslConfig.put("saslJaasClientSectionName", 
"PulsarClient");
+               clientSaslConfig.put("serverType", "proxy");
+               log.info("set client jaas section name: PulsarClient, 
serverType: proxy");
+               Authentication authSasl = 
AuthenticationFactory.create(AuthenticationSasl.class.getName(), 
clientSaslConfig);
+
+               return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+                               
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
+       }
+}
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index 61d2b04..e098bcd 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -156,7 +156,7 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         conf.setAuthenticationEnabled(true);
         conf.setSaslAuthentication(true);
         conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
-        conf.setSaslJaasBrokerSectionName("PulsarBroker");
+        conf.setSaslJaasServerSectionName("PulsarBroker");
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderSasl.class.getName());
         conf.setAuthenticationProviders(providers);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f7c0f93..b3262c1 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -226,7 +226,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                     + "(disable default-ttl with value 0)"
         )
     private int ttlDurationDefaultInSeconds = 0;
-    
+
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Enable the deletion of inactive topics"
@@ -625,7 +625,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         category = CATEGORY_SASL_AUTH,
         doc = "Service Principal, for login context name. Default value is 
\"Broker\"."
     )
-    private String saslJaasBrokerSectionName = 
SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;
+    private String saslJaasServerSectionName = 
SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;
 
     @FieldContext(
         category = CATEGORY_SASL_AUTH,
diff --git 
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
 
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index 7b98be3..b8eca39 100644
--- 
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ 
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -53,6 +52,7 @@ public class AuthenticationSasl implements Authentication, 
EncodedAuthentication
 
     private Map<String, String> configuration;
     private String loginContextName;
+    private String serverType = null;
 
     public AuthenticationSasl() {
     }
@@ -63,10 +63,10 @@ public class AuthenticationSasl implements Authentication, 
EncodedAuthentication
     }
 
     @Override
-    public AuthenticationDataProvider getAuthData(String brokerHostName) 
throws PulsarClientException {
+    public AuthenticationDataProvider getAuthData(String serverHostname) 
throws PulsarClientException {
         // reuse this to return a DataProvider which contains a SASL client
         try {
-            PulsarSaslClient saslClient = new PulsarSaslClient(brokerHostName, 
jaasCredentialsContainer.getSubject());
+            PulsarSaslClient saslClient = new PulsarSaslClient(serverHostname, 
serverType, jaasCredentialsContainer.getSubject());
             return new SaslAuthenticationDataProvider(saslClient);
         } catch (Throwable t) {
             log.error("Failed create sasl client: {}", t);
@@ -105,6 +105,8 @@ public class AuthenticationSasl implements Authentication, 
EncodedAuthentication
         // read section from config files of kerberos
         this.loginContextName = authParams
             .getOrDefault(SaslConstants.JAAS_CLIENT_SECTION_NAME, 
SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+        this.serverType = authParams
+            .getOrDefault(SaslConstants.SASL_SERVER_TYPE, 
SaslConstants.SASL_BROKER_PROTOCOL);
 
         // init the static jaasCredentialsContainer that shares amongst client.
         if (!initializedJAAS) {
diff --git 
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
 
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
index 23399a1..59ee47c5e 100644
--- 
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
+++ 
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
@@ -49,11 +49,14 @@ public class PulsarSaslClient {
     private final SaslClient saslClient;
     private final Subject clientSubject;
 
-    public PulsarSaslClient(String serverHostname, Subject subject) throws 
SaslException {
+    public PulsarSaslClient(String serverHostname, String serverType, Subject 
subject) throws SaslException {
         checkArgument(subject != null, "Cannot create SASL client with NULL 
JAAS subject");
         checkArgument(!Strings.isNullOrEmpty(serverHostname), "Cannot create 
SASL client with NUll server name");
+        
checkArgument(serverType.equalsIgnoreCase(SaslConstants.SASL_BROKER_PROTOCOL) ||
+                serverType.equalsIgnoreCase(SaslConstants.SASL_PROXY_PROTOCOL),
+            "Server type [" + serverType + "] invalid, should be broker or 
proxy");
 
-        String serverPrincipal = SaslConstants.SASL_PULSAR_PROTOCOL + "/" + 
serverHostname;
+        String serverPrincipal = serverType.toLowerCase() + "/" + 
serverHostname;
         this.clientSubject = subject;
         if (clientSubject.getPrincipals().isEmpty()) {
             throw new SaslException("Cannot create SASL client with empty JAAS 
subject principal");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 948c027..2921777 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -123,14 +123,14 @@ public class ClientCnx extends PulsarHandler {
 
     protected String proxyToTargetBrokerAddress = null;
     // Remote hostName with which client is connected
-    private String remoteHostName = null;
+    protected String remoteHostName = null;
     private boolean isTlsHostnameVerificationEnable;
     private DefaultHostnameVerifier hostnameVerifier;
 
     private final ScheduledFuture<?> timeoutTask;
 
     // Added for mutual authentication.
-    private AuthenticationDataProvider authenticationDataProvider;
+    protected AuthenticationDataProvider authenticationDataProvider;
 
     enum State {
         None, SentConnectFrame, Ready, Failed, Connecting
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 0481e18..3692e84 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.api;
 
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom;
 import static 
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8;
 
@@ -154,7 +155,7 @@ public class Commands {
     }
 
     public static ByteBuf newConnect(String authMethodName, AuthData authData, 
int protocolVersion, String libVersion,
-                                     String targetBroker, String 
originalPrincipal, String originalAuthData,
+                                     String targetBroker, String 
originalPrincipal, AuthData originalAuthData,
                                      String originalAuthMethod) {
         CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
         connectBuilder.setClientVersion(libVersion != null ? libVersion : 
"Pulsar Client");
@@ -174,7 +175,7 @@ public class Commands {
         }
 
         if (originalAuthData != null) {
-            connectBuilder.setOriginalAuthData(originalAuthData);
+            connectBuilder.setOriginalAuthData(new 
String(originalAuthData.getBytes(), UTF_8));
         }
 
         if (originalAuthMethod != null) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
index 749d411..b2e3c3c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
@@ -29,13 +29,9 @@ public class SaslConstants {
 
     public static final String AUTH_METHOD_NAME = "sasl";
 
-    // service broker Principal
-    public static final String JAAS_BROKER_SECTION_NAME = 
"saslJaasBrokerSectionName";
+    // service section name, this is broker or proxy Principal
+    public static final String JAAS_SERVER_SECTION_NAME = 
"saslJaasServerSectionName";
     public static final String JAAS_DEFAULT_BROKER_SECTION_NAME = 
"PulsarBroker";
-
-    //TODO: for sasl proxy.
-    // github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
-    public static final String JAAS_PROXY_SECTION_NAME = 
"saslJaasProxySectionName";
     public static final String JAAS_DEFAULT_PROXY_SECTION_NAME = "PulsarProxy";
 
     // Client principal
@@ -53,8 +49,11 @@ public class SaslConstants {
 
     public static final String KINIT_COMMAND = "kerberos.kinit";
 
+    // The sasl server type that client will connect to. default value broker, 
could also be proxy.
+    public static final String SASL_SERVER_TYPE = "serverType";
     // The non-null string name of the protocol for which the authentication 
is being performed (e.g., "ldap").
-    public static final String SASL_PULSAR_PROTOCOL = "broker";
+    public static final String SASL_BROKER_PROTOCOL = "broker";
+    public static final String SASL_PROXY_PROTOCOL = "proxy";
     // The non-null fully-qualified host name of the server to authenticate to.
     public static final String SASL_PULSAR_REALM = "EXAMPLE.COM";
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 0f96845..bfe3485 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -19,15 +19,23 @@
 
 package org.apache.pulsar.proxy.server;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.net.URI;
 import java.net.URISyntaxException;
 
 import javax.net.ssl.SSLSession;
 
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,20 +55,20 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
-import io.prometheus.client.Counter;
 
 public class DirectProxyHandler {
 
     private Channel inboundChannel;
     Channel outboundChannel;
     private String originalPrincipal;
-    private String clientAuthData;
+    private AuthData clientAuthData;
     private String clientAuthMethod;
     private int protocolVersion;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
     private final SslContext sslCtx;
+    private AuthenticationDataProvider authenticationDataProvider;
 
     public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection, String targetBrokerUrl,
             int protocolVersion, SslContext sslCtx) {
@@ -138,10 +146,8 @@ public class DirectProxyHandler {
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
             this.ctx = ctx;
             // Send the Connect command to broker
-            String authData = "";
-            if (authentication.getAuthData().hasDataFromCommand()) {
-                authData = authentication.getAuthData().getCommandData();
-            }
+            authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
+            AuthData authData = 
authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
             ByteBuf command = null;
             command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, 
clientAuthData, clientAuthMethod);
@@ -177,6 +183,35 @@ public class DirectProxyHandler {
         }
 
         @Override
+        protected void handleAuthChallenge(CommandAuthChallenge authChallenge) 
{
+            checkArgument(authChallenge.hasChallenge());
+            checkArgument(authChallenge.getChallenge().hasAuthData() && 
authChallenge.getChallenge().hasAuthData());
+
+            // mutual authn. If auth not complete, continue auth; if auth 
complete, complete connectionFuture.
+            try {
+                AuthData authData = authenticationDataProvider
+                    
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData().toByteArray()));
+
+                checkState(!authData.isComplete());
+
+                ByteBuf request = 
Commands.newAuthResponse(authentication.getAuthMethodName(),
+                    authData,
+                    this.protocolVersion,
+                    PulsarVersion.getVersion());
+
+                if (log.isDebugEnabled()) {
+                    log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
+                }
+
+                outboundChannel.writeAndFlush(request);
+                outboundChannel.read();
+            } catch (Exception e) {
+                log.error("Error mutual verify: {}", e);
+                return;
+            }
+        }
+
+        @Override
         public void operationComplete(Future<Void> future) throws Exception {
             // This is invoked when the write operation on the paired 
connection
             // is completed
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 91e7345..624391e 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -22,9 +22,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 
 import org.apache.pulsar.PulsarVersion;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.Commands;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,12 +32,12 @@ import org.slf4j.LoggerFactory;
 public class ProxyClientCnx extends ClientCnx {
 
     String clientAuthRole;
-    String clientAuthData;
+    AuthData clientAuthData;
     String clientAuthMethod;
     int protocolVersion;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, String clientAuthRole,
-            String clientAuthData, String clientAuthMethod, int 
protocolVersion) {
+                          AuthData clientAuthData, String clientAuthMethod, 
int protocolVersion) {
         super(conf, eventLoopGroup);
         this.clientAuthRole = clientAuthRole;
         this.clientAuthData = clientAuthData;
@@ -46,19 +46,18 @@ public class ProxyClientCnx extends ClientCnx {
     }
 
     @Override
-    protected ByteBuf newConnectCommand() throws PulsarClientException {
+    protected ByteBuf newConnectCommand() throws Exception {
         if (log.isDebugEnabled()) {
-            log.debug(
-                    "New Connection opened via ProxyClientCnx with params 
clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
+            log.debug("New Connection opened via ProxyClientCnx with params 
clientAuthRole = {}," +
+                    " clientAuthData = {}, clientAuthMethod = {}",
                     clientAuthRole, clientAuthData, clientAuthMethod);
         }
-        String authData = null;
-        if (authentication.getAuthData().hasDataFromCommand()) {
-            authData = authentication.getAuthData().getCommandData();
-        }
-        return Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
-                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
-                clientAuthMethod);
+
+        authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
+        AuthData authData = 
authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
+            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
+            clientAuthMethod);
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ProxyClientCnx.class);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 37be744..7b9a2c5 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PropertiesContext;
 import org.apache.pulsar.common.configuration.PropertyContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.common.sasl.SaslConstants;
 
 @Getter
 @Setter
@@ -62,6 +63,8 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     private static final String CATEGORY_TOKEN_AUTH = "Token Authentication 
Provider";
     @Category
     private static final String CATEGORY_HTTP = "HTTP";
+    @Category
+    private static final String CATEGORY_SASL_AUTH = "SASL Authentication 
Provider";
 
     @FieldContext(
         category = CATEGORY_BROKER_DISCOVERY,
@@ -186,6 +189,27 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private boolean forwardAuthorizationCredentials = false;
 
+
+    @FieldContext(
+        category = CATEGORY_SASL_AUTH,
+        doc = "This is a regexp, which limits the range of possible ids which 
can connect to the Broker using SASL.\n"
+            + " Default value is: \".*pulsar.*\", so only clients whose id 
contains 'pulsar' are allowed to connect."
+    )
+    private String saslJaasClientAllowedIds = 
SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT;
+
+    @FieldContext(
+        category = CATEGORY_SASL_AUTH,
+        doc = "Service Principal, for login context name. Default value is 
\"PulsarProxy\"."
+    )
+    private String saslJaasServerSectionName = 
SaslConstants.JAAS_DEFAULT_PROXY_SECTION_NAME;
+
+    @FieldContext(
+        category = CATEGORY_SASL_AUTH,
+        doc = "kerberos kinit command."
+    )
+    private String kinitCommand = "/usr/bin/kinit";
+
+
     @FieldContext(
         category = CATEGORY_RATE_LIMITING,
         doc = "Max concurrent inbound connections. The proxy will reject 
requests beyond that"
@@ -408,4 +432,4 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
             return String.format("HttpReverseProxyConfig(%s, path=%s, 
proxyTo=%s)", name, path, proxyTo);
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 90b979e..a1147f6 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -23,21 +23,27 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 
+import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarChannelInitializer;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
@@ -70,12 +76,24 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
     private LookupProxyHandler lookupProxyHandler = null;
     private DirectProxyHandler directProxyHandler = null;
     String clientAuthRole;
-    String clientAuthData;
+    AuthData clientAuthData;
     String clientAuthMethod;
 
+    private String authMethod = "none";
+    AuthenticationProvider authenticationProvider;
+    AuthenticationState authState;
+    private ClientConfigurationData clientConf;
+    private boolean hasProxyToBrokerUrl;
+    private int protocolVersionToAdvertise;
+    private String proxyToBrokerUrl;
+
     enum State {
         Init,
 
+        // Connecting between user client and proxy server.
+        // Mutual authn needs verify between client and proxy server several 
times.
+        Connecting,
+
         // Proxy the lookup requests to a random broker
         // Follow redirects
         ProxyLookupRequests,
@@ -150,6 +168,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
     public void channelRead(final ChannelHandlerContext ctx, Object msg) 
throws Exception {
         switch (state) {
         case Init:
+        case Connecting:
         case ProxyLookupRequests:
             // Do the regular decoding for the Connected message
             super.channelRead(ctx, msg);
@@ -182,54 +201,160 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         }
     }
 
-    /**
-     * handles connect request and sends {@code State.Connected} ack to client
-     */
+    private void completeConnect() {
+        LOG.info("[{}] complete connection, init proxy handler. authenticated 
with {} role {}, hasProxyToBrokerUrl: {}",
+            remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
+        if (hasProxyToBrokerUrl) {
+            // Client already knows which broker to connect. Let's open a
+            // connection there and just pass bytes in both directions
+            state = State.ProxyConnectionToBroker;
+            directProxyHandler = new DirectProxyHandler(service, this, 
proxyToBrokerUrl,
+                protocolVersionToAdvertise, sslCtx);
+            cancelKeepAliveTask();
+        } else {
+            // Client is doing a lookup, we can consider the handshake complete
+            // and we'll take care of just topics and
+            // partitions metadata lookups
+            state = State.ProxyLookupRequests;
+            lookupProxyHandler = new LookupProxyHandler(service, this);
+            
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
+        }
+    }
+
+    private void createClientAndCompleteConnect(AuthData clientData)
+        throws PulsarClientException {
+        if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+            this.clientAuthData = clientData;
+            this.clientAuthMethod = authMethod;
+        }
+        this.client = createClient(clientConf, this.clientAuthData, 
this.clientAuthMethod, protocolVersionToAdvertise);
+
+        completeConnect();
+    }
+
+    // According to auth result, send newConnected or newAuthChallenge command.
+    private void doAuthentication(AuthData clientData) throws Exception {
+        AuthData brokerData = authState.authenticate(clientData);
+        // authentication has completed, will send newConnected command.
+        if (authState.isComplete()) {
+            clientAuthRole = authState.getAuthRole();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Client successfully authenticated with {} role 
{}",
+                    remoteAddress, authMethod, clientAuthRole);
+            }
+            createClientAndCompleteConnect(clientData);
+            return;
+        }
+
+        // auth not complete, continue auth with client side.
+        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, 
protocolVersionToAdvertise));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("[{}] Authentication in progress client by method {}.",
+                remoteAddress, authMethod);
+        }
+        state = State.Connecting;
+        return;
+    }
+
     @Override
     protected void handleConnect(CommandConnect connect) {
         checkArgument(state == State.Init);
-        remoteEndpointProtocolVersion = connect.getProtocolVersion();
+        this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
+        this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
+        this.protocolVersionToAdvertise = 
getProtocolVersionToAdvertise(connect);
+        this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? 
connect.getProxyToBrokerUrl() : "null";
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Received CONNECT from {} proxyToBroker={}", 
remoteAddress,
-                    connect.hasProxyToBrokerUrl() ? 
connect.getProxyToBrokerUrl() : "null");
+            LOG.debug("Received CONNECT from {} proxyToBroker={}", 
remoteAddress, proxyToBrokerUrl);
+            LOG.debug(
+                "[{}] Protocol version to advertise to broker is {}, 
clientProtocolVersion={}, proxyProtocolVersion={}",
+                remoteAddress, protocolVersionToAdvertise, 
remoteEndpointProtocolVersion,
+                Commands.getCurrentProtocolVersion());
         }
 
-        // Client need to do some minimal cooperation logic.
         if (remoteEndpointProtocolVersion < 
PulsarApi.ProtocolVersion.v10_VALUE) {
             LOG.warn("[{}] Client doesn't support connecting through proxy", 
remoteAddress);
             ctx.close();
             return;
         }
 
-        int protocolVersionToAdvertise = 
getProtocolVersionToAdvertise(connect);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                    "[{}] Protocol version to advertise to broker is {}, 
clientProtocolVersion={}, proxyProtocolVersion={}",
-                    remoteAddress, protocolVersionToAdvertise, 
remoteEndpointProtocolVersion,
-                    Commands.getCurrentProtocolVersion());
-        }
+        try {
+            // init authn
+            this.clientConf = createClientConfiguration();
+            this.clientAuthentication = clientConf.getAuthentication();
+            int protocolVersion = getProtocolVersionToAdvertise(connect);
+
+            // authn not enabled, complete
+            if (!service.getConfiguration().isAuthenticationEnabled()) {
+                this.client = new PulsarClientImpl(clientConf, 
service.getWorkerGroup(),
+                    new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(),
+                        () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersion)));
+
+                completeConnect();
+                return;
+            }
+
+            AuthData clientData = 
AuthData.of(connect.getAuthData().toByteArray());
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            authenticationProvider = service
+                .getAuthenticationService()
+                .getAuthenticationProvider(authMethod);
+
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".
+            if (authenticationProvider == null) {
+                clientAuthRole = 
service.getAuthenticationService().getAnonymousUserRole()
+                    .orElseThrow(() ->
+                        new AuthenticationException("No anonymous role, and no 
authentication provider configured"));
 
-        if (!authenticateAndCreateClient(connect)) {
+                createClientAndCompleteConnect(clientData);
+                return;
+            }
+
+            // init authState and other var
+            ChannelHandler sslHandler = 
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
+            SSLSession sslSession = null;
+            if (sslHandler != null) {
+                sslSession = ((SslHandler) sslHandler).engine().getSession();
+            }
+
+            authState = authenticationProvider.newAuthState(clientData, 
remoteAddress, sslSession);
+            doAuthentication(clientData);
+        } catch (Exception e) {
+            LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
             ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, "Failed to authenticate"));
             close();
             return;
         }
+    }
 
-        if (connect.hasProxyToBrokerUrl()) {
-            // Client already knows which broker to connect. Let's open a
-            // connection
-            // there and just pass bytes in both directions
-            state = State.ProxyConnectionToBroker;
-            directProxyHandler = new DirectProxyHandler(service, this, 
connect.getProxyToBrokerUrl(),
-                    protocolVersionToAdvertise, sslCtx);
-            cancelKeepAliveTask();
-        } else {
-            // Client is doing a lookup, we can consider the handshake complete
-            // and we'll take care of just topics and
-            // partitions metadata lookups
-            state = State.ProxyLookupRequests;
-            lookupProxyHandler = new LookupProxyHandler(service, this);
-            
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
+    @Override
+    protected void handleAuthResponse(CommandAuthResponse authResponse) {
+        checkArgument(state == State.Connecting);
+        checkArgument(authResponse.hasResponse());
+        checkArgument(authResponse.getResponse().hasAuthData() && 
authResponse.getResponse().hasAuthMethodName());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received AuthResponse from {}, auth method: {}",
+                remoteAddress, authResponse.getResponse().getAuthMethodName());
+        }
+
+        try {
+            AuthData clientData = 
AuthData.of(authResponse.getResponse().getAuthData().toByteArray());
+            doAuthentication(clientData);
+        } catch (Exception e) {
+            String msg = "Unable to handleAuthResponse";
+            LOG.warn("[{}] {} ", remoteAddress, msg, e);
+            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, msg));
+            close();
         }
     }
 
@@ -282,50 +407,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         return clientConf;
     }
 
-    private boolean authenticateAndCreateClient(CommandConnect connect) {
-        try {
-            ClientConfigurationData clientConf = createClientConfiguration();
-            this.clientAuthentication = clientConf.getAuthentication();
-
-            final int protocolVersion = getProtocolVersionToAdvertise(connect);
-            if (!service.getConfiguration().isAuthenticationEnabled()) {
-                this.client = new PulsarClientImpl(clientConf, 
service.getWorkerGroup(),
-                        new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(),
-                                () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersion)));
-                return true;
-            }
-
-            String authMethod = "none";
-            if (connect.hasAuthMethodName()) {
-                authMethod = connect.getAuthMethodName();
-            } else if (connect.hasAuthMethod()) {
-                // Legacy client is passing enum
-                authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
-            }
-            String authData = connect.getAuthData().toStringUtf8();
-            ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
-            SSLSession sslSession = null;
-            if (sslHandler != null) {
-                sslSession = ((SslHandler) sslHandler).engine().getSession();
-            }
-            authenticationData = new AuthenticationDataCommand(authData, 
remoteAddress, sslSession);
-            clientAuthRole = 
service.getAuthenticationService().authenticate(authenticationData, authMethod);
-            LOG.info("[{}] Client successfully authenticated with {} role {}", 
remoteAddress, authMethod,
-                    clientAuthRole);
-            if 
(service.getConfiguration().isForwardAuthorizationCredentials()) {
-                this.clientAuthData = authData;
-                this.clientAuthMethod = authMethod;
-            }
-            this.client = createClient(clientConf, this.clientAuthData, 
this.clientAuthMethod, protocolVersion);
-
-            return true;
-        } catch (Exception e) {
-            LOG.warn("[{}] Unable to authenticate: {}", remoteAddress, 
e.getMessage());
-            return false;
-        }
-    }
-
-    private PulsarClientImpl createClient(final ClientConfigurationData 
clientConf, final String clientAuthData,
+    private PulsarClientImpl createClient(final ClientConfigurationData 
clientConf, final AuthData clientAuthData,
             final String clientAuthMethod, final int protocolVersion) throws 
PulsarClientException {
         return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
                 new ProxyConnectionPool(clientConf, service.getWorkerGroup(), 
() -> new ProxyClientCnx(clientConf,

Reply via email to