This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a9dad0fc58da860fb0af1a3ccb902fc3fdaf8c04 Author: Rajini Sivaram <[email protected]> AuthorDate: Thu Apr 29 14:32:50 2021 +0100 KAFKA-12730; Avoid duplicate logout if Kerberos login fails (#10611) From Java 9 onwards, LoginContext#logout() throws an NPE if invoked multiple times due to https://bugs.openjdk.java.net/browse/JDK-8173069. KerberosLogin currently attempts logout followed by login in a background refresh thread. If login fails we retry the same sequence. As a result, a single login failure prevents subsequent re-login. And clients will never be able to authenticate successfully after the first failure, until the process is restarted. The commit checks if logout is ne [...] Reviewers: Manikumar Reddy <[email protected]> --- .../common/security/kerberos/KerberosLogin.java | 14 ++++- .../kafka/server/GssapiAuthenticationTest.scala | 64 ++++++++++++++++++---- 2 files changed, 63 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java index f39f35c..a91a964 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java @@ -360,17 +360,25 @@ public class KerberosLogin extends AbstractLogin { lastLogin = currentElapsedTime(); //clear up the kerberos state. But the tokens are not cleared! As per //the Java kerberos login module code, only the kerberos credentials - //are cleared - logout(); + //are cleared. If previous logout succeeded but login failed, we shouldn't + //logout again since duplicate logout causes NPE from Java 9 onwards. + if (subject != null && !subject.getPrincipals().isEmpty()) { + logout(); + } //login and also update the subject field of this instance to //have the new credentials (pass it to the LoginContext constructor) loginContext = new LoginContext(contextName(), subject, null, configuration()); log.info("Initiating re-login for {}", principal); - loginContext.login(); + login(loginContext); } } // Visibility to override for testing + protected void login(LoginContext loginContext) throws LoginException { + loginContext.login(); + } + + // Visibility to override for testing protected void logout() throws LoginException { loginContext.logout(); } diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala index fa21a94..efb51f1 100644 --- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala +++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala @@ -23,6 +23,7 @@ import java.time.Duration import java.util.{Collections, Properties} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import javax.security.auth.login.LoginContext import kafka.api.{Both, IntegrationTestHarness, SaslSetup} import kafka.utils.TestUtils import org.apache.kafka.clients.CommonClientConfigs @@ -100,6 +101,29 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { } /** + * Verifies that if login fails, subsequent re-login without failures works and clients + * are able to connect after the second re-login. Verifies that logout is performed only once + * since duplicate logouts without successful login results in NPE from Java 9 onwards. + */ + @Test + def testLoginFailure(): Unit = { + val selector = createSelectorWithRelogin() + try { + val login = TestableKerberosLogin.instance + assertNotNull(login) + login.loginException = Some(new RuntimeException("Test exception to fail login")) + executor.submit(() => login.reLogin(), 0) + executor.submit(() => login.reLogin(), 0) + + verifyRelogin(selector, login) + assertEquals(2, login.loginAttempts) + assertEquals(1, login.logoutAttempts) + } finally { + selector.close() + } + } + + /** * Verifies that there are no authentication failures during Kerberos re-login. If authentication * is performed when credentials are unavailable between logout and login, we handle it as a * transient error and not an authentication failure so that clients may retry. @@ -111,23 +135,26 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { val login = TestableKerberosLogin.instance assertNotNull(login) executor.submit(() => login.reLogin(), 0) - - val node1 = "1" - selector.connect(node1, serverAddr, 1024, 1024) - login.logoutResumeLatch.countDown() - login.logoutCompleteLatch.await(15, TimeUnit.SECONDS) - assertFalse("Authenticated during re-login", pollUntilReadyOrDisconnected(selector, node1)) - - login.reLoginResumeLatch.countDown() - login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS) - val node2 = "2" - selector.connect(node2, serverAddr, 1024, 1024) - assertTrue("Authenticated failed after re-login", pollUntilReadyOrDisconnected(selector, node2)) + verifyRelogin(selector, login) } finally { selector.close() } } + private def verifyRelogin(selector: Selector, login: TestableKerberosLogin): Unit = { + val node1 = "1" + selector.connect(node1, serverAddr, 1024, 1024) + login.logoutResumeLatch.countDown() + login.logoutCompleteLatch.await(15, TimeUnit.SECONDS) + assertFalse("Authenticated during re-login", pollUntilReadyOrDisconnected(selector, node1)) + + login.reLoginResumeLatch.countDown() + login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS) + val node2 = "2" + selector.connect(node2, serverAddr, 1024, 1024) + assertTrue("Authenticated failed after re-login", pollUntilReadyOrDisconnected(selector, node2)) + } + /** * Tests that Kerberos error `Server not found in Kerberos database (7)` is handled * as a fatal authentication failure. @@ -258,6 +285,9 @@ class TestableKerberosLogin extends KerberosLogin { val logoutCompleteLatch = new CountDownLatch(1) val reLoginResumeLatch = new CountDownLatch(1) val reLoginCompleteLatch = new CountDownLatch(1) + @volatile var loginException: Option[RuntimeException] = None + @volatile var loginAttempts = 0 + @volatile var logoutAttempts = 0 assertNull(TestableKerberosLogin.instance) TestableKerberosLogin.instance = this @@ -267,7 +297,17 @@ class TestableKerberosLogin extends KerberosLogin { reLoginCompleteLatch.countDown() } + override protected def login(loginContext: LoginContext): Unit = { + loginAttempts += 1 + loginException.foreach { e => + loginException = None + throw e + } + super.login(loginContext) + } + override protected def logout(): Unit = { + logoutAttempts += 1 logoutResumeLatch.await(15, TimeUnit.SECONDS) super.logout() logoutCompleteLatch.countDown()
