This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad17ea1  KAFKA-10556: NPE if sasl.mechanism is unrecognized (#9356)
ad17ea1 is described below

commit ad17ea10890872ddd1264681d61e2c5a40382590
Author: Ron Dagostino <rdagost...@confluent.io>
AuthorDate: Thu Oct 1 04:20:25 2020 -0400

    KAFKA-10556: NPE if sasl.mechanism is unrecognized (#9356)
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../security/authenticator/SaslClientAuthenticator.java   |  6 +++++-
 .../security/authenticator/SaslServerAuthenticator.java   |  5 ++++-
 .../security/authenticator/SaslAuthenticatorTest.java     | 15 ++++++++++++---
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 6d279ac..bba1c43 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -214,7 +214,11 @@ public class SaslClientAuthenticator implements 
Authenticator {
                 String[] mechs = {mechanism};
                 log.debug("Creating SaslClient: 
client={};service={};serviceHostname={};mechs={}",
                     clientPrincipalName, servicePrincipal, host, 
Arrays.toString(mechs));
-                return Sasl.createSaslClient(mechs, clientPrincipalName, 
servicePrincipal, host, configs, callbackHandler);
+                SaslClient retvalSaslClient = Sasl.createSaslClient(mechs, 
clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
+                if (retvalSaslClient == null) {
+                    throw new SaslAuthenticationException("Failed to create 
SaslClient with mechanism " + mechanism);
+                }
+                return retvalSaslClient;
             });
         } catch (PrivilegedActionException e) {
             throw new SaslAuthenticationException("Failed to create SaslClient 
with mechanism " + mechanism, e.getCause());
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index b959d68..20dbf7b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -193,8 +193,11 @@ public class SaslServerAuthenticator implements 
Authenticator {
             try {
                 saslServer = Subject.doAs(subject, 
(PrivilegedExceptionAction<SaslServer>) () ->
                     Sasl.createSaslServer(saslMechanism, "kafka", 
serverAddress().getHostName(), configs, callbackHandler));
+                if (saslServer == null) {
+                    throw new SaslException("Kafka Server failed to create a 
SaslServer to interact with a client during session authentication with server 
mechanism " + saslMechanism);
+                }
             } catch (PrivilegedActionException e) {
-                throw new SaslException("Kafka Server failed to create a 
SaslServer to interact with a client during session authentication", 
e.getCause());
+                throw new SaslException("Kafka Server failed to create a 
SaslServer to interact with a client during session authentication with server 
mechanism " + saslMechanism, e.getCause());
             }
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index ed922b1..5c1ce3c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -1236,9 +1236,18 @@ public class SaslAuthenticatorTest {
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID");
 
         server = createEchoServer(securityProtocol);
-        createAndCheckClientConnectionFailure(securityProtocol, node);
-        server.verifyAuthenticationMetrics(0, 1);
-        server.verifyReauthenticationMetrics(0, 0);
+        try {
+            createAndCheckClientConnectionFailure(securityProtocol, node);
+            fail("Did not generate exception prior to creating channel");
+        } catch (IOException expected) {
+            server.verifyAuthenticationMetrics(0, 0);
+            server.verifyReauthenticationMetrics(0, 0);
+            Throwable underlyingCause = 
expected.getCause().getCause().getCause();
+            assertEquals(SaslAuthenticationException.class, 
underlyingCause.getClass());
+            assertEquals("Failed to create SaslClient with mechanism INVALID", 
underlyingCause.getMessage());
+        } finally {
+            closeClientConnectionIfNecessary();
+        }
     }
 
     /**

Reply via email to