This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4836ed79ce8 MINOR: Code cleanup and additional tests for
DefaultJwtValidator
4836ed79ce8 is described below
commit 4836ed79ce80e826834b7490c085d0cfa8a3bc89
Author: Kirk True <[email protected]>
AuthorDate: Mon Nov 24 10:25:42 2025 -0800
MINOR: Code cleanup and additional tests for DefaultJwtValidator
---
.../security/oauthbearer/DefaultJwtValidator.java | 10 ++++++-
.../oauthbearer/DefaultJwtValidatorTest.java | 33 ++++++++++++++++++++
.../kafka/api/ClientOAuthIntegrationTest.scala | 35 ++++++++++++++++++++--
3 files changed, 74 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
index 478a0fdc916..3d5710fb64c 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java
@@ -17,7 +17,9 @@
package org.apache.kafka.common.security.oauthbearer;
+import org.apache.kafka.common.config.SaslConfigs;
import
org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
+import
org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
import org.apache.kafka.common.utils.Utils;
import org.jose4j.keys.resolvers.VerificationKeyResolver;
@@ -54,7 +56,13 @@ public class DefaultJwtValidator implements JwtValidator {
if (verificationKeyResolver.isPresent()) {
delegate = new BrokerJwtValidator(verificationKeyResolver.get());
} else {
- delegate = new ClientJwtValidator();
+ ConfigurationUtils cu = new ConfigurationUtils(configs,
saslMechanism);
+
+ if
(cu.containsKey(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL)) {
+ delegate = new BrokerJwtValidator();
+ } else {
+ delegate = new ClientJwtValidator();
+ }
}
delegate.configure(configs, saslMechanism, jaasConfigEntries);
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
index 14c33a012c8..cf3754a77ac 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java
@@ -17,21 +17,34 @@
package org.apache.kafka.common.security.oauthbearer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import
org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenBuilder;
import
org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
import
org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.jwk.JsonWebKeySet;
+import org.jose4j.jwk.PublicJsonWebKey;
import org.jose4j.jws.AlgorithmIdentifiers;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.util.Map;
+import static
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
import static
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
public class DefaultJwtValidatorTest extends OAuthBearerTest {
+ @AfterEach
+ public void tearDown() {
+
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+ }
+
@Test
public void testConfigureWithVerificationKeyResolver() {
AccessTokenBuilder builder = new AccessTokenBuilder()
@@ -51,6 +64,26 @@ public class DefaultJwtValidatorTest extends OAuthBearerTest
{
assertInstanceOf(ClientJwtValidator.class, jwtValidator.delegate());
}
+ @Test
+ public void testConfigureWithJwksUrl() throws Exception {
+ PublicJsonWebKey jwk = createRsaJwk();
+ AccessTokenBuilder builder = new AccessTokenBuilder()
+ .jwk(jwk)
+ .alg(AlgorithmIdentifiers.RSA_USING_SHA256);
+ String accessToken = builder.build();
+
+ JsonWebKeySet jwks = new JsonWebKeySet(jwk);
+ String jwksJson =
jwks.toJson(JsonWebKey.OutputControlLevel.PUBLIC_ONLY);
+ String fileUrl = tempFile(jwksJson).toURI().toString();
+ System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, fileUrl);
+ Map<String, ?> configs =
getSaslConfigs(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, fileUrl);
+
+ DefaultJwtValidator jwtValidator = new DefaultJwtValidator();
+ assertDoesNotThrow(() -> jwtValidator.configure(configs,
OAUTHBEARER_MECHANISM, getJaasConfigEntries()));
+ assertInstanceOf(BrokerJwtValidator.class, jwtValidator.delegate());
+ assertDoesNotThrow(() -> jwtValidator.validate(accessToken));
+ }
+
private CloseableVerificationKeyResolver
createVerificationKeyResolver(AccessTokenBuilder builder) {
return (jws, nestingContext) -> builder.jwk().getPublicKey();
}
diff --git
a/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
index 22ab6f2673c..8745e7ce969 100644
--- a/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala
@@ -26,10 +26,11 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach,
Disabled, TestInfo}
import java.util.{Base64, Collections, Properties}
import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
-import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.errors.SaslAuthenticationException
import org.apache.kafka.common.security.auth.SecurityProtocol
-import
org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler,
OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
+import org.apache.kafka.common.security.oauthbearer.{JwtRetriever,
OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule,
OAuthBearerValidatorCallbackHandler}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
@@ -244,6 +245,27 @@ class ClientOAuthIntegrationTest extends
IntegrationTestHarness with SaslSetup {
assertThrows(classOf[ConfigException], () =>
createAdminClient(configOverrides = configs))
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testAuthenticationErrorOnTamperedJwt(groupProtocol: String): Unit = {
+ val className = classOf[TamperedJwtRetriever].getName
+
+ val configs = defaultOAuthConfigs()
+ configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
+
+ val tp = new TopicPartition("test-topic", 0)
+
+ val admin = createAdminClient(configOverrides = configs)
+ TestUtils.assertFutureThrows(classOf[SaslAuthenticationException],
admin.describeCluster().clusterId())
+
+ val producer = createProducer(configOverrides = configs)
+ assertThrows(classOf[SaslAuthenticationException], () =>
producer.partitionsFor(tp.topic()))
+
+ val consumer = createConsumer(configOverrides = configs)
+ consumer.assign(Collections.singleton(tp))
+ assertThrows(classOf[SaslAuthenticationException], () =>
consumer.position(tp))
+ }
+
def generatePrivateKeyFile(): File = {
val file = File.createTempFile("private-", ".key")
val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
@@ -258,4 +280,11 @@ class ClientOAuthIntegrationTest extends
IntegrationTestHarness with SaslSetup {
file
}
-}
\ No newline at end of file
+}
+
+class TamperedJwtRetriever extends JwtRetriever {
+
+ override def retrieve(): String = {
+
"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJzdWIiOiAiMTIzNDU2Nzg5MCIsICJuYW1lIjogIkpvaG4gRG9lIiwgInJvbGUiOiAiYWRtaW4iLCAiaWF0IjogMTUxNjIzOTAyMiwgImV4cCI6IDE5MTYyMzkwMjJ9.vVT5ylQCGvb0B-wv1YXHjmlMd-DZKCThUt5-enry_sA"
+ }
+}