This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new f0e234d  KAFKA-10798; Ensure response is delayed for failed SASL 
authentication with connection close delay (#9678)
f0e234d is described below

commit f0e234d8e71a355e848adb40628cbe6f97f35268
Author: Rajini Sivaram <[email protected]>
AuthorDate: Mon Dec 7 16:12:18 2020 +0000

    KAFKA-10798; Ensure response is delayed for failed SASL authentication with 
connection close delay (#9678)
    
    Reviewers: Manikumar Reddy <[email protected]>
---
 .../authenticator/SaslServerAuthenticator.java     |  2 +-
 .../apache/kafka/common/network/NioEchoServer.java |  2 +-
 .../SaslAuthenticatorFailureDelayTest.java         | 37 ++++++++++++++++++++--
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index ce7bd69..923ddea 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -460,7 +460,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
                     String errorMessage = "Authentication failed during "
                             + reauthInfo.authenticationOrReauthenticationText()
                             + " due to invalid credentials with SASL mechanism 
" + saslMechanism;
-                    sendKafkaResponse(requestContext, new 
SaslAuthenticateResponse(
+                    buildResponseOnAuthenticateFailure(requestContext, new 
SaslAuthenticateResponse(
                             new SaslAuthenticateResponseData()
                             
.setErrorCode(Errors.SASL_AUTHENTICATION_FAILED.code())
                             .setErrorMessage(errorMessage)));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index d69b184..5d96d5e 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -95,7 +95,7 @@ public class NioEchoServer extends Thread {
     public NioEchoServer(ListenerName listenerName, SecurityProtocol 
securityProtocol, AbstractConfig config,
                          String serverHost, ChannelBuilder channelBuilder, 
CredentialCache credentialCache,
                          int failedAuthenticationDelayMs, Time time) throws 
Exception {
-        this(listenerName, securityProtocol, config, serverHost, 
channelBuilder, credentialCache, 100, time,
+        this(listenerName, securityProtocol, config, serverHost, 
channelBuilder, credentialCache, failedAuthenticationDelayMs, time,
                 new DelegationTokenCache(ScramMechanism.mechanismNames()));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
index 599345a..19003ed 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
@@ -45,6 +45,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -55,7 +56,7 @@ import static org.junit.Assert.assertTrue;
 public class SaslAuthenticatorFailureDelayTest {
     private static final int BUFFER_SIZE = 4 * 1024;
 
-    private final MockTime time = new MockTime(10);
+    private final MockTime time = new MockTime(1);
     private NioEchoServer server;
     private Selector selector;
     private ChannelBuilder channelBuilder;
@@ -119,6 +120,38 @@ public class SaslAuthenticatorFailureDelayTest {
     }
 
     /**
+     * Tests that SASL/SCRAM clients with invalid password fail authentication 
with
+     * connection close delay if configured.
+     */
+    @Test
+    public void testInvalidPasswordSaslScram() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("SCRAM-SHA-256", 
Collections.singletonList("SCRAM-SHA-256"));
+        jaasConfig.setClientOptions("SCRAM-SHA-256", TestJaasConfig.USERNAME, 
"invalidpassword");
+
+        server = createEchoServer(securityProtocol);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, 
"SCRAM-SHA-256", null);
+        server.verifyAuthenticationMetrics(0, 1);
+    }
+
+    /**
+     * Tests that clients with disabled SASL mechanism fail authentication with
+     * connection close delay if configured.
+     */
+    @Test
+    public void testDisabledSaslMechanism() throws Exception {
+        String node = "0";
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        TestJaasConfig jaasConfig = configureMechanisms("SCRAM-SHA-256", 
Collections.singletonList("SCRAM-SHA-256"));
+        jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, 
"invalidpassword");
+
+        server = createEchoServer(securityProtocol);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, 
"SCRAM-SHA-256", null);
+        server.verifyAuthenticationMetrics(0, 1);
+    }
+
+    /**
      * Tests client connection close before response for authentication 
failure is sent.
      */
     @Test
@@ -215,7 +248,7 @@ public class SaslAuthenticatorFailureDelayTest {
         Exception exception = finalState.exception();
         assertTrue("Invalid exception class " + exception.getClass(), 
exception instanceof SaslAuthenticationException);
         if (expectedErrorMessage == null)
-            expectedErrorMessage = "Authentication failed due to invalid 
credentials with SASL mechanism " + mechanism;
+            expectedErrorMessage = "Authentication failed during 
authentication due to invalid credentials with SASL mechanism " + mechanism;
         assertEquals(expectedErrorMessage, exception.getMessage());
     }
 

Reply via email to