This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ed6bfcdd799617ec036993236135c42d40c9f901
Author: Rajini Sivaram <[email protected]>
AuthorDate: Mon Nov 23 09:04:16 2020 +0000

    KAFKA-10727; Handle Kerberos error during re-login as transient failure in 
clients (#9605)
    
    We use a background thread for Kerberos to perform re-login before tickets 
expire. The thread performs logout() followed by login(), relying on the Java 
library to clear and then populate credentials in Subject. This leaves a timing 
window where clients fail to authenticate because credentials are not 
available. We cannot introduce any form of locking since authentication is 
performed on the network thread. So this commit treats NO_CRED as a transient 
failure rather than a fatal authe [...]
    
    Reviewers: Ron Dagostino <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 .../kafka/common/network/SaslChannelBuilder.java   |   2 +-
 .../authenticator/SaslClientAuthenticator.java     |   8 +-
 .../common/security/kerberos/KerberosError.java    |  19 ++++
 .../common/security/kerberos/KerberosLogin.java    |   9 +-
 .../kafka/server/GssapiAuthenticationTest.scala    | 107 ++++++++++++++++++---
 5 files changed, 126 insertions(+), 19 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index a29148a..f01c4ef 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -327,7 +327,7 @@ public class SaslChannelBuilder implements ChannelBuilder, 
ListenerReconfigurabl
         }
     }
 
-    private Class<? extends Login> defaultLoginClass() {
+    protected Class<? extends Login> defaultLoginClass() {
         if (jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM))
             return KerberosLogin.class;
         if 
(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(clientSaslMechanism))
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 8b32e81..10b9907 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -529,15 +529,17 @@ public class SaslClientAuthenticator implements 
Authenticator {
                     " Users must configure FQDN of kafka brokers when 
authenticating using SASL and" +
                     " `socketChannel.socket().getInetAddress().getHostName()` 
must match the hostname in `principal/hostname@realm`";
             }
-            error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
             //Unwrap the SaslException inside `PrivilegedActionException`
             Throwable cause = e.getCause();
             // Treat transient Kerberos errors as non-fatal SaslExceptions 
that are processed as I/O exceptions
             // and all other failures as fatal SaslAuthenticationException.
-            if (kerberosError != null && kerberosError.retriable())
+            if ((kerberosError != null && kerberosError.retriable()) || 
(kerberosError == null && KerberosError.isRetriableClientGssException(e))) {
+                error += " Kafka Client will retry.";
                 throw new SaslException(error, cause);
-            else
+            } else {
+                error += " Kafka Client will go to AUTHENTICATION_FAILED 
state.";
                 throw new SaslAuthenticationException(error, cause);
+            }
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
index 9c76482..4b8e8e0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosError.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.kerberos;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.utils.Java;
+import org.ietf.jgss.GSSException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,4 +110,22 @@ public enum KerberosError {
         }
         return null;
     }
+
+    /**
+     * Returns true if the exception should be handled as a transient failure 
on clients.
+     * We handle GSSException.NO_CRED as retriable on the client-side since 
this may
+     * occur during re-login if a clients attempts to authentication after 
logout, but
+     * before the subsequent login.
+     */
+    public static boolean isRetriableClientGssException(Exception exception) {
+        Throwable cause = exception.getCause();
+        while (cause != null && !(cause instanceof GSSException)) {
+            cause = cause.getCause();
+        }
+        if (cause != null) {
+            GSSException gssException = (GSSException) cause;
+            return gssException.getMajor() == GSSException.NO_CRED;
+        }
+        return false;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 2faf6be..f39f35c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -344,7 +344,7 @@ public class KerberosLogin extends AbstractLogin {
      * Re-login a principal. This method assumes that {@link #login()} has 
happened already.
      * @throws javax.security.auth.login.LoginException on a failure
      */
-    private void reLogin() throws LoginException {
+    protected void reLogin() throws LoginException {
         if (!isKrbTicket) {
             return;
         }
@@ -361,7 +361,7 @@ public class KerberosLogin extends AbstractLogin {
             //clear up the kerberos state. But the tokens are not cleared! As 
per
             //the Java kerberos login module code, only the kerberos 
credentials
             //are cleared
-            loginContext.logout();
+            logout();
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext 
constructor)
             loginContext = new LoginContext(contextName(), subject, null, 
configuration());
@@ -370,6 +370,11 @@ public class KerberosLogin extends AbstractLogin {
         }
     }
 
+    // Visibility to override for testing
+    protected void logout() throws LoginException {
+        loginContext.logout();
+    }
+
     private long currentElapsedTime() {
         return time.hiResClockMs();
     }
diff --git 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index d6414c0..fa21a94 100644
--- 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -20,8 +20,8 @@ package kafka.server
 
 import java.net.InetSocketAddress
 import java.time.Duration
-import java.util.Properties
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.{Collections, Properties}
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 
 import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
 import kafka.utils.TestUtils
@@ -31,7 +31,8 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
+import org.apache.kafka.common.security.kerberos.KerberosLogin
 import org.apache.kafka.common.utils.{LogContext, MockTime}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -57,6 +58,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness 
with SaslSetup {
 
   @Before
   override def setUp(): Unit = {
+    TestableKerberosLogin.reset()
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Option(kafkaClientSaslMechanism), Both))
     serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
     serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp, 
failedAuthenticationDelayMs.toString)
@@ -78,6 +80,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness 
with SaslSetup {
     executor.shutdownNow()
     super.tearDown()
     closeSasl()
+    TestableKerberosLogin.reset()
   }
 
   /**
@@ -97,6 +100,35 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
   }
 
   /**
+   * Verifies that there are no authentication failures during Kerberos 
re-login. If authentication
+   * is performed when credentials are unavailable between logout and login, 
we handle it as a
+   * transient error and not an authentication failure so that clients may 
retry.
+   */
+  @Test
+  def testReLogin(): Unit = {
+    val selector = createSelectorWithRelogin()
+    try {
+      val login = TestableKerberosLogin.instance
+      assertNotNull(login)
+      executor.submit(() => login.reLogin(), 0)
+
+      val node1 = "1"
+      selector.connect(node1, serverAddr, 1024, 1024)
+      login.logoutResumeLatch.countDown()
+      login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
+      assertFalse("Authenticated during re-login", 
pollUntilReadyOrDisconnected(selector, node1))
+
+      login.reLoginResumeLatch.countDown()
+      login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
+      val node2 = "2"
+      selector.connect(node2, serverAddr, 1024, 1024)
+      assertTrue("Authenticated failed after re-login", 
pollUntilReadyOrDisconnected(selector, node2))
+    } finally {
+      selector.close()
+    }
+  }
+
+  /**
    * Tests that Kerberos error `Server not found in Kerberos database (7)` is 
handled
    * as a fatal authentication failure.
    */
@@ -149,16 +181,8 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
       while (actualSuccessfulAuths < numSuccessfulAuths) {
         val nodeId = actualSuccessfulAuths.toString
         selector.connect(nodeId, serverAddr, 1024, 1024)
-        TestUtils.waitUntilTrue(() => {
-          selector.poll(100)
-          val disconnectState = selector.disconnected().get(nodeId)
-          // Verify that disconnect state is not AUTHENTICATION_FAILED
-          if (disconnectState != null)
-            assertEquals(s"Authentication failed with exception 
${disconnectState.exception()}",
-              ChannelState.State.AUTHENTICATE, disconnectState.state())
-          selector.isChannelReady(nodeId) || disconnectState != null
-        }, "Client not ready or disconnected within timeout")
-        if (selector.isChannelReady(nodeId))
+        val isReady = pollUntilReadyOrDisconnected(selector, nodeId)
+        if (isReady)
           actualSuccessfulAuths += 1
         selector.close(nodeId)
       }
@@ -167,6 +191,22 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
     }
   }
 
+  private def pollUntilReadyOrDisconnected(selector: Selector, nodeId: 
String): Boolean = {
+    TestUtils.waitUntilTrue(() => {
+      selector.poll(100)
+      val disconnectState = selector.disconnected().get(nodeId)
+      // Verify that disconnect state is not AUTHENTICATION_FAILED
+      if (disconnectState != null) {
+        assertEquals(s"Authentication failed with exception 
${disconnectState.exception()}",
+          ChannelState.State.AUTHENTICATE, disconnectState.state())
+      }
+      selector.isChannelReady(nodeId) || disconnectState != null
+    }, "Client not ready or disconnected within timeout")
+    val isReady = selector.isChannelReady(nodeId)
+    selector.close(nodeId)
+    isReady
+  }
+
   /**
    * Verifies that authentication with the current `clientConfig` results in 
disconnection and that
    * the disconnection is notified with disconnect state 
`AUTHENTICATION_FAILED`. This is to ensure
@@ -192,4 +232,45 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
       time, true, new LogContext())
     NetworkTestUtils.createSelector(channelBuilder, time)
   }
+
+  private def createSelectorWithRelogin(): Selector = {
+    
clientConfig.setProperty(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "0")
+    val config = new TestSecurityConfig(clientConfig)
+    val jaasContexts = Collections.singletonMap("GSSAPI", 
JaasContext.loadClientContext(config.values()))
+    val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, 
securityProtocol,
+      null, false, kafkaClientSaslMechanism, true, null, null, time, new 
LogContext()) {
+      override protected def defaultLoginClass(): Class[_ <: Login] = 
classOf[TestableKerberosLogin]
+    }
+    channelBuilder.configure(config.values())
+    NetworkTestUtils.createSelector(channelBuilder, time)
+  }
+}
+
+object TestableKerberosLogin {
+  @volatile var instance: TestableKerberosLogin = _
+  def reset(): Unit = {
+    instance = null
+  }
+}
+
+class TestableKerberosLogin extends KerberosLogin {
+  val logoutResumeLatch = new CountDownLatch(1)
+  val logoutCompleteLatch = new CountDownLatch(1)
+  val reLoginResumeLatch = new CountDownLatch(1)
+  val reLoginCompleteLatch = new CountDownLatch(1)
+
+  assertNull(TestableKerberosLogin.instance)
+  TestableKerberosLogin.instance = this
+
+  override def reLogin(): Unit = {
+    super.reLogin()
+    reLoginCompleteLatch.countDown()
+  }
+
+  override protected def logout(): Unit = {
+    logoutResumeLatch.await(15, TimeUnit.SECONDS)
+    super.logout()
+    logoutCompleteLatch.countDown()
+    reLoginResumeLatch.await(15, TimeUnit.SECONDS)
+  }
 }

Reply via email to