This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d6e5edf8ef KAFKA-19153: Add OAuth integration tests (#19938)
7d6e5edf8ef is described below

commit 7d6e5edf8eff946b04865263ad8ed176480d7c6e
Author: Kirk True <[email protected]>
AuthorDate: Thu Jun 12 12:48:14 2025 -0700

    KAFKA-19153: Add OAuth integration tests (#19938)
    
    Adds a test dependency on
    [mock-oauth2-server](https://github.com/navikt/mock-oauth2-server/) for
    integration tests for OAuth layer. Also includes fixes for some
    regressions that were caught by the integration tests.
    
    Reviewers: Manikumar Reddy <[email protected]>, Lianet Magrans
     <[email protected]>
---
 build.gradle                                       |   1 +
 .../oauthbearer/ClientCredentialsJwtRetriever.java |   9 +-
 .../OAuthBearerValidatorCallbackHandler.java       |   2 +-
 .../internals/secured/ConfigurationUtils.java      |  12 +
 .../secured/assertion/DefaultAssertionCreator.java |   4 +-
 .../oauthbearer/JwtBearerJwtRetrieverTest.java     |   5 +-
 .../kafka/api/ClientOAuthIntegrationTest.scala     | 261 +++++++++++++++++++++
 gradle/dependencies.gradle                         |   2 +
 licenses/mock-oauth2-server-MIT                    |  21 ++
 9 files changed, 308 insertions(+), 9 deletions(-)

diff --git a/build.gradle b/build.gradle
index 767395ad32d..8e0f4393252 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1060,6 +1060,7 @@ project(':core') {
     testImplementation libs.junitJupiter
     testImplementation libs.caffeine
     testImplementation testLog4j2Libs
+    testImplementation libs.mockOAuth2Server
 
     testRuntimeOnly runtimeTestLibs
   }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
index 627434f6d3c..4744fd91289 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java
@@ -39,7 +39,10 @@ import javax.security.auth.login.AppConfigurationEntry;
 
 import static 
org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
 import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET;
 import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE;
 import static 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
 import static 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
 import static 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
@@ -173,8 +176,8 @@ public class ClientCredentialsJwtRetriever implements 
JwtRetriever {
 
         private String clientId() {
             return getValue(
+                SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
                 CLIENT_ID_CONFIG,
-                "clientId",
                 true,
                 cu::validateString,
                 jou::validateString
@@ -183,8 +186,8 @@ public class ClientCredentialsJwtRetriever implements 
JwtRetriever {
 
         private String clientSecret() {
             return getValue(
+                SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
                 CLIENT_SECRET_CONFIG,
-                "clientSecret",
                 true,
                 cu::validatePassword,
                 jou::validateString
@@ -193,8 +196,8 @@ public class ClientCredentialsJwtRetriever implements 
JwtRetriever {
 
         private String scope() {
             return getValue(
+                SASL_OAUTHBEARER_SCOPE,
                 SCOPE_CONFIG,
-                "scope",
                 false,
                 cu::validateString,
                 jou::validateString
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
index 6563d36b8b6..60fa8cdb678 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
@@ -170,7 +170,7 @@ public class OAuthBearerValidatorCallbackHandler implements 
AuthenticateCallback
     }
 
     private void checkConfigured() {
-        if (verificationKeyResolver == null || jwtValidator == null)
+        if (jwtValidator == null)
             throw new IllegalStateException(String.format("To use %s, first 
call the configure method", getClass().getSimpleName()));
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
index a0819766a38..3eebecf8fde 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Utils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
@@ -47,6 +50,8 @@ import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALL
 
 public class ConfigurationUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigurationUtils.class);
+
     private final Map<String, ?> configs;
 
     private final String prefix;
@@ -344,6 +349,13 @@ public class ConfigurationUtils {
                 ((OAuthBearerConfigurable) o).configure(configs, 
saslMechanism, jaasConfigEntries);
             } catch (Exception e) {
                 Utils.maybeCloseQuietly(o, "Instance of class " + 
o.getClass().getName() + " failed call to configure()");
+                LOG.warn(
+                    "The class {} defined in the {} configuration encountered 
an error on configure(): {}",
+                    o.getClass().getName(),
+                    configName,
+                    e.getMessage(),
+                    e
+                );
                 throw new ConfigException(
                     String.format(
                         "The class %s defined in the %s configuration 
encountered an error on configure(): %s",
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
index db562fade87..52b9eb2fb53 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java
@@ -16,7 +16,7 @@
  */
 package 
org.apache.kafka.common.security.oauthbearer.internals.secured.assertion;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.oauthbearer.JwtRetrieverException;
 import 
org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile;
 import org.apache.kafka.common.utils.Utils;
 
@@ -89,7 +89,7 @@ public class DefaultAssertionCreator implements 
AssertionCreator {
 
                 return privateKey(contents.getBytes(StandardCharsets.UTF_8), 
passphrase);
             } catch (GeneralSecurityException | IOException e) {
-                throw new KafkaException("An error occurred generating the 
OAuth assertion private key from " + file.getPath(), e);
+                throw new JwtRetrieverException("An error occurred generating 
the OAuth assertion private key from " + file.getPath(), e);
             }
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
index c466ac83689..4a4e567dedf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.common.security.oauthbearer;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
 import 
org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
 
@@ -95,7 +94,7 @@ public class JwtBearerJwtRetrieverTest extends 
OAuthBearerTest {
         List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
 
         try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) 
{
-            KafkaException e = assertThrows(KafkaException.class, () -> 
jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
+            JwtRetrieverException e = 
assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, 
OAUTHBEARER_MECHANISM, jaasConfigEntries));
             assertNotNull(e.getCause());
             assertInstanceOf(GeneralSecurityException.class, e.getCause());
         }
@@ -144,7 +143,7 @@ public class JwtBearerJwtRetrieverTest extends 
OAuthBearerTest {
         List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
 
         try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) 
{
-            KafkaException e = assertThrows(KafkaException.class, () -> 
jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
+            JwtRetrieverException e = 
assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, 
OAUTHBEARER_MECHANISM, jaasConfigEntries));
             assertNotNull(e.getCause());
             assertInstanceOf(IOException.class, e.getCause());
         }
diff --git 
a/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
new file mode 100644
index 00000000000..22ab6f2673c
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.api
+
+import com.nimbusds.jose.jwk.RSAKey
+import kafka.api.{IntegrationTestHarness, SaslSetup}
+import kafka.utils.TestInfoUtils
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.{ConfigException, SaslConfigs}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
+
+import java.util.{Base64, Collections, Properties}
+import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
+import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import 
org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler, 
OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
+import java.nio.file.StandardOpenOption
+import java.security.{KeyPairGenerator, PrivateKey}
+import java.security.interfaces.RSAPublicKey
+import java.util
+
+/**
+ * Integration tests for the consumer that cover basic usage as well as 
coordinator failure
+ */
+class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup 
{
+
+  override val brokerCount = 3
+
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+  protected def kafkaClientSaslMechanism = "OAUTHBEARER"
+  protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+
+  val issuerId = "default"
+  var mockOAuthServer: MockOAuth2Server = _
+  var privateKey: PrivateKey = _
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    // Step 1: Generate the key pair dynamically.
+    val keyGen = KeyPairGenerator.getInstance("RSA")
+    keyGen.initialize(2048)
+    val keyPair = keyGen.generateKeyPair()
+
+    privateKey = keyPair.getPrivate
+
+    // Step 2: Create the RSA JWK from key pair.
+    val rsaJWK = new 
RSAKey.Builder(keyPair.getPublic.asInstanceOf[RSAPublicKey])
+      .privateKey(privateKey)
+      .keyID("foo")
+      .build()
+
+    // Step 3: Create the OAuth server using the keys just created
+    val keyProvider = new KeyProvider(Collections.singletonList(rsaJWK))
+    val tokenProvider = new OAuth2TokenProvider(keyProvider)
+    val oauthConfig = new OAuth2Config(false, null, null, false, tokenProvider)
+    mockOAuthServer = new MockOAuth2Server(oauthConfig)
+
+    mockOAuthServer.start()
+    val tokenEndpointUrl = 
mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
+    val jwksUrl = mockOAuthServer.jwksUrl(issuerId).url().toString
+    
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
s"$tokenEndpointUrl,$jwksUrl")
+
+    val listenerNamePrefix = 
s"listener.name.${listenerName.value().toLowerCase}"
+
+    
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_JAAS_CONFIG}",
 s"${classOf[OAuthBearerLoginModule].getName} required ;")
+    
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE}",
 issuerId)
+    
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL}",
 jwksUrl)
+    
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG}",
 classOf[OAuthBearerValidatorCallbackHandler].getName)
+
+    // create static config including client login context with credentials 
for JaasTestUtils 'client2'
+    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Option(kafkaClientSaslMechanism)))
+
+    // The superuser needs the configuration in setUp because it's used to 
create resources before the individual
+    // test methods are invoked.
+    superuserClientConfig.putAll(defaultClientCredentialsConfigs())
+
+    super.setUp(testInfo)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    if (mockOAuthServer != null)
+      mockOAuthServer.shutdown()
+
+    closeSasl()
+    super.tearDown()
+
+    
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG)
+    
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG)
+  }
+
+  def defaultOAuthConfigs(): Properties = {
+    val tokenEndpointUrl = 
mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
+
+    val configs = new Properties()
+    configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.name)
+    configs.put(SaslConfigs.SASL_JAAS_CONFIG, 
jaasClientLoginModule(kafkaClientSaslMechanism))
+    configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, 
classOf[OAuthBearerLoginCallbackHandler].getName)
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
tokenEndpointUrl)
+    configs
+  }
+
+  def defaultClientCredentialsConfigs(): Properties = {
+    val configs = defaultOAuthConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID, 
"test-client")
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET, 
"test-secret")
+    configs
+  }
+
+  def defaultJwtBearerConfigs(): Properties = {
+    val configs = defaultOAuthConfigs()
+    configs.put(SaslConfigs.SASL_JAAS_CONFIG, 
jaasClientLoginModule(kafkaClientSaslMechanism))
+    configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, 
classOf[OAuthBearerLoginCallbackHandler].getName)
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, 
"org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever")
+    configs
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testBasicClientCredentials(groupProtocol: String): Unit = {
+    val configs = defaultClientCredentialsConfigs()
+    assertDoesNotThrow(() => createProducer(configOverrides = configs))
+    assertDoesNotThrow(() => createConsumer(configOverrides = configs))
+    assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testBasicJwtBearer(groupProtocol: String): Unit = {
+    val jwt = mockOAuthServer.issueToken(issuerId, "jdoe", "someaudience", 
Collections.singletonMap("scope", "test"))
+    val assertionFile = TestUtils.tempFile(jwt.serialize())
+    
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, 
assertionFile.getAbsolutePath)
+
+    val configs = defaultJwtBearerConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, 
assertionFile.getAbsolutePath)
+
+    assertDoesNotThrow(() => createProducer(configOverrides = configs))
+    assertDoesNotThrow(() => createConsumer(configOverrides = configs))
+    assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testBasicJwtBearer2(groupProtocol: String): Unit = {
+    val privateKeyFile = generatePrivateKeyFile()
+    
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, 
privateKeyFile.getAbsolutePath)
+
+    val configs = defaultJwtBearerConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE, 
privateKeyFile.getPath)
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD, "default")
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB, 
"kafka-client-test-sub")
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "default")
+    //    configs.put(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, "aud")
+
+    assertDoesNotThrow(() => createProducer(configOverrides = configs))
+    assertDoesNotThrow(() => createConsumer(configOverrides = configs))
+    assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
+  }
+
+  @Disabled("KAFKA-19394: Failure in 
ConsumerNetworkThread.initializeResources() can cause hangs on 
AsyncKafkaConsumer.close()")
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testJwtBearerWithMalformedAssertionFile(groupProtocol: String): Unit = {
+    // Create the assertion file, but fill it with non-JWT garbage.
+    val assertionFile = TestUtils.tempFile("CQEN*)Q#F)&)^#QNC")
+    
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, 
assertionFile.getAbsolutePath)
+
+    val configs = defaultJwtBearerConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, 
assertionFile.getAbsolutePath)
+
+    assertThrows(classOf[KafkaException], () => createProducer(configOverrides 
= configs))
+    assertThrows(classOf[KafkaException], () => createConsumer(configOverrides 
= configs))
+    assertThrows(classOf[KafkaException], () => 
createAdminClient(configOverrides = configs))
+  }
+
+  @Disabled("KAFKA-19394: Failure in 
ConsumerNetworkThread.initializeResources() can cause hangs on 
AsyncKafkaConsumer.close()")
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testJwtBearerWithEmptyAssertionFile(groupProtocol: String): Unit = {
+    // Create the assertion file, but leave it empty.
+    val assertionFile = TestUtils.tempFile()
+    
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, 
assertionFile.getAbsolutePath)
+
+    val configs = defaultJwtBearerConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, 
assertionFile.getAbsolutePath)
+
+    assertThrows(classOf[KafkaException], () => createProducer(configOverrides 
= configs))
+    assertThrows(classOf[KafkaException], () => createConsumer(configOverrides 
= configs))
+    assertThrows(classOf[KafkaException], () => 
createAdminClient(configOverrides = configs))
+  }
+
+  @Disabled("KAFKA-19394: Failure in 
ConsumerNetworkThread.initializeResources() can cause hangs on 
AsyncKafkaConsumer.close()")
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testJwtBearerWithMissingAssertionFile(groupProtocol: String): Unit = {
+    val missingFileName = "/this/does/not/exist.txt"
+
+    val configs = defaultJwtBearerConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, missingFileName)
+
+    assertThrows(classOf[KafkaException], () => createProducer(configOverrides 
= configs))
+    assertThrows(classOf[KafkaException], () => createConsumer(configOverrides 
= configs))
+    assertThrows(classOf[KafkaException], () => 
createAdminClient(configOverrides = configs))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testUnsupportedJwtRetriever(groupProtocol: String): Unit = {
+    val className = 
"org.apache.kafka.common.security.oauthbearer.ThisIsNotARealJwtRetriever"
+
+    val configs = defaultOAuthConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
+
+    assertThrows(classOf[ConfigException], () => 
createProducer(configOverrides = configs))
+    assertThrows(classOf[ConfigException], () => 
createConsumer(configOverrides = configs))
+    assertThrows(classOf[ConfigException], () => 
createAdminClient(configOverrides = configs))
+  }
+
+  def generatePrivateKeyFile(): File = {
+    val file = File.createTempFile("private-", ".key")
+    val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
+    var channel: FileChannel = null
+
+    try {
+      channel = FileChannel.open(file.toPath, 
util.EnumSet.of(StandardOpenOption.WRITE))
+      Utils.writeFully(channel, ByteBuffer.wrap(bytes))
+    } finally {
+      channel.close()
+    }
+
+    file
+  }
+}
\ No newline at end of file
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index ba83624b0de..7d278545054 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -123,6 +123,7 @@ versions += [
   slf4j: "1.7.36",
   snappy: "1.1.10.7",
   spotbugs: "4.8.6",
+  mockOAuth2Server: "2.2.1",
   zinc: "1.9.2",
   // When updating the zstd version, please do as well in 
docker/native/native-image-configs/resource-config.json
   // Also make sure the compression levels in 
org.apache.kafka.common.record.CompressionType are still valid
@@ -224,6 +225,7 @@ libs += [
   snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
   spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
   swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion",
+  mockOAuth2Server: 
"no.nav.security:mock-oauth2-server:$versions.mockOAuth2Server",
   jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
   mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
   zstd: "com.github.luben:zstd-jni:$versions.zstd",
diff --git a/licenses/mock-oauth2-server-MIT b/licenses/mock-oauth2-server-MIT
new file mode 100644
index 00000000000..ef1b1129b10
--- /dev/null
+++ b/licenses/mock-oauth2-server-MIT
@@ -0,0 +1,21 @@
+# The MIT License
+
+Copyright 2025 NAV (Arbeids- og velferdsdirektoratet) - The Norwegian Labour 
and Welfare Administration
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the "Software"),
+to deal in the Software without restriction, including without limitation
+the rights to use, copy, modify, merge, publish, distribute, sublicense,
+and/or sell copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file

Reply via email to