This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4836ed79ce8 MINOR: Code cleanup and additional tests for 
DefaultJwtValidator
4836ed79ce8 is described below

commit 4836ed79ce80e826834b7490c085d0cfa8a3bc89
Author: Kirk True <[email protected]>
AuthorDate: Mon Nov 24 10:25:42 2025 -0800

    MINOR: Code cleanup and additional tests for DefaultJwtValidator
---
 .../security/oauthbearer/DefaultJwtValidator.java  | 10 ++++++-
 .../oauthbearer/DefaultJwtValidatorTest.java       | 33 ++++++++++++++++++++
 .../kafka/api/ClientOAuthIntegrationTest.scala     | 35 ++++++++++++++++++++--
 3 files changed, 74 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
index 478a0fdc916..3d5710fb64c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
@@ -17,7 +17,9 @@
 
 package org.apache.kafka.common.security.oauthbearer;
 
+import org.apache.kafka.common.config.SaslConfigs;
 import 
org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
 import org.apache.kafka.common.utils.Utils;
 
 import org.jose4j.keys.resolvers.VerificationKeyResolver;
@@ -54,7 +56,13 @@ public class DefaultJwtValidator implements JwtValidator {
         if (verificationKeyResolver.isPresent()) {
             delegate = new BrokerJwtValidator(verificationKeyResolver.get());
         } else {
-            delegate = new ClientJwtValidator();
+            ConfigurationUtils cu = new ConfigurationUtils(configs, 
saslMechanism);
+
+            if 
(cu.containsKey(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL)) {
+                delegate = new BrokerJwtValidator();
+            } else {
+                delegate = new ClientJwtValidator();
+            }
         }
 
         delegate.configure(configs, saslMechanism, jaasConfigEntries);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
index 14c33a012c8..cf3754a77ac 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
@@ -17,21 +17,34 @@
 
 package org.apache.kafka.common.security.oauthbearer;
 
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import 
org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenBuilder;
 import 
org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
 import 
org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
 
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.jwk.JsonWebKeySet;
+import org.jose4j.jwk.PublicJsonWebKey;
 import org.jose4j.jws.AlgorithmIdentifiers;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
 import static 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 
 public class DefaultJwtValidatorTest extends OAuthBearerTest {
 
+    @AfterEach
+    public void tearDown() {
+        
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+    }
+
     @Test
     public void testConfigureWithVerificationKeyResolver() {
         AccessTokenBuilder builder = new AccessTokenBuilder()
@@ -51,6 +64,26 @@ public class DefaultJwtValidatorTest extends OAuthBearerTest 
{
         assertInstanceOf(ClientJwtValidator.class, jwtValidator.delegate());
     }
 
+    @Test
+    public void testConfigureWithJwksUrl() throws Exception {
+        PublicJsonWebKey jwk = createRsaJwk();
+        AccessTokenBuilder builder = new AccessTokenBuilder()
+            .jwk(jwk)
+            .alg(AlgorithmIdentifiers.RSA_USING_SHA256);
+        String accessToken = builder.build();
+
+        JsonWebKeySet jwks = new JsonWebKeySet(jwk);
+        String jwksJson = 
jwks.toJson(JsonWebKey.OutputControlLevel.PUBLIC_ONLY);
+        String fileUrl = tempFile(jwksJson).toURI().toString();
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, fileUrl);
+        Map<String, ?> configs = 
getSaslConfigs(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, fileUrl);
+
+        DefaultJwtValidator jwtValidator = new DefaultJwtValidator();
+        assertDoesNotThrow(() -> jwtValidator.configure(configs, 
OAUTHBEARER_MECHANISM, getJaasConfigEntries()));
+        assertInstanceOf(BrokerJwtValidator.class, jwtValidator.delegate());
+        assertDoesNotThrow(() -> jwtValidator.validate(accessToken));
+    }
+
     private CloseableVerificationKeyResolver 
createVerificationKeyResolver(AccessTokenBuilder builder) {
         return (jws, nestingContext) -> builder.jwk().getPublicKey();
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
index 22ab6f2673c..8745e7ce969 100644
--- a/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
@@ -26,10 +26,11 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, 
Disabled, TestInfo}
 import java.util.{Base64, Collections, Properties}
 import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
 import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
-import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import 
org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler, 
OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
+import org.apache.kafka.common.security.oauthbearer.{JwtRetriever, 
OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, 
OAuthBearerValidatorCallbackHandler}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
@@ -244,6 +245,27 @@ class ClientOAuthIntegrationTest extends 
IntegrationTestHarness with SaslSetup {
     assertThrows(classOf[ConfigException], () => 
createAdminClient(configOverrides = configs))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testAuthenticationErrorOnTamperedJwt(groupProtocol: String): Unit = {
+    val className = classOf[TamperedJwtRetriever].getName
+
+    val configs = defaultOAuthConfigs()
+    configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
+
+    val tp = new TopicPartition("test-topic", 0)
+
+    val admin = createAdminClient(configOverrides = configs)
+    TestUtils.assertFutureThrows(classOf[SaslAuthenticationException], 
admin.describeCluster().clusterId())
+
+    val producer = createProducer(configOverrides = configs)
+    assertThrows(classOf[SaslAuthenticationException], () => 
producer.partitionsFor(tp.topic()))
+
+    val consumer = createConsumer(configOverrides = configs)
+    consumer.assign(Collections.singleton(tp))
+    assertThrows(classOf[SaslAuthenticationException], () => 
consumer.position(tp))
+  }
+
   def generatePrivateKeyFile(): File = {
     val file = File.createTempFile("private-", ".key")
     val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
@@ -258,4 +280,11 @@ class ClientOAuthIntegrationTest extends 
IntegrationTestHarness with SaslSetup {
 
     file
   }
-}
\ No newline at end of file
+}
+
+class TamperedJwtRetriever extends JwtRetriever {
+
+  override def retrieve(): String = {
+    
"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJzdWIiOiAiMTIzNDU2Nzg5MCIsICJuYW1lIjogIkpvaG4gRG9lIiwgInJvbGUiOiAiYWRtaW4iLCAiaWF0IjogMTUxNjIzOTAyMiwgImV4cCI6IDE5MTYyMzkwMjJ9.vVT5ylQCGvb0B-wv1YXHjmlMd-DZKCThUt5-enry_sA"
+  }
+}

Reply via email to