This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15c5c075c17 MINOR: Clean up for sasl endpoints (#18519)
15c5c075c17 is described below

commit 15c5c075c17f1fa8a8b958bf299e80a1b1f2ae11
Author: Luke Chen <[email protected]>
AuthorDate: Fri Jan 31 17:27:04 2025 +0900

    MINOR: Clean up for sasl endpoints (#18519)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../config/internals/BrokerSecurityConfigs.java    |  4 ++
 .../internals/secured/ConfigurationUtils.java      | 31 ++++++--
 .../OAuthBearerLoginCallbackHandlerTest.java       | 10 +++
 .../secured/AccessTokenRetrieverFactoryTest.java   | 25 ++++++-
 .../internals/secured/ConfigurationUtilsTest.java  | 40 +++++++++++
 .../VerificationKeyResolverFactoryTest.java        | 82 ++++++++++++++++++++++
 6 files changed, 185 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 0021c3d11ff..0b26733f0c4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -130,6 +130,10 @@ public class BrokerSecurityConfigs {
 
     public static final String SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG = 
"sasl.mechanism.inter.broker.protocol";
     public static final String SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC = 
"SASL mechanism used for inter-broker communication. Default is GSSAPI.";
+
+    // The allowlist of the SASL OAUTHBEARER endpoints
+    public static final String ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG = 
"org.apache.kafka.sasl.oauthbearer.allowed.urls";
+    public static final String ALLOWED_SASL_OAUTHBEARER_URLS_DEFAULT = "";
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
             // General Security Configuration
             .define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG, 
LONG, BrokerSecurityConfigs.DEFAULT_CONNECTIONS_MAX_REAUTH_MS, MEDIUM, 
BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
index 0be91cdfb5b..10f700826c8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java
@@ -25,8 +25,14 @@ import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_DEFAULT;
 
 /**
  * <code>ConfigurationUtils</code> is a utility class to perform basic 
configuration-related
@@ -151,11 +157,12 @@ public class ConfigurationUtils {
     /**
      * Validates that the configured URL that:
      *
-     * <li>
-     *     <ul>is well-formed</ul>
-     *     <ul>contains a scheme</ul>
-     *     <ul>uses either HTTP, HTTPS, or file protocols</ul>
-     * </li>
+     * <ul>
+     *     <li>is well-formed</li>
+     *     <li>contains a scheme</li>
+     *     <li>uses either HTTP, HTTPS, or file protocols</li>
+     *     <li>is in the allow-list</li>
+     * </ul>
      *
      * No effort is made to connect to the URL in the validation step.
      */
@@ -180,6 +187,8 @@ public class ConfigurationUtils {
         if (!(protocol.equals("http") || protocol.equals("https") || 
protocol.equals("file")))
             throw new ConfigException(String.format("The OAuth configuration 
option %s contains a URL (%s) that contains an invalid protocol (%s); only 
\"http\", \"https\", and \"file\" protocol are supported", name, value, 
protocol));
 
+        throwIfURLIsNotAllowed(value);
+
         return url;
     }
 
@@ -228,4 +237,16 @@ public class ConfigurationUtils {
         return (T) configs.get(name);
     }
 
+    // visible for testing
+    // make sure the url is in the 
"org.apache.kafka.sasl.oauthbearer.allowed.urls" system property
+    void throwIfURLIsNotAllowed(String value) {
+        Set<String> allowedUrls = Arrays.stream(
+                        
System.getProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
ALLOWED_SASL_OAUTHBEARER_URLS_DEFAULT).split(","))
+                .map(String::trim)
+                .collect(Collectors.toSet());
+        if (!allowedUrls.contains(value)) {
+            throw new ConfigException(value + " is not allowed. Update system 
property '"
+                    + ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG + "' to allow " + 
value);
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java
index 70cfd41b9c1..5b1b2976662 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBeare
 import org.apache.kafka.common.utils.Utils;
 
 import org.jose4j.jws.AlgorithmIdentifiers;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -45,6 +46,7 @@ import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 
 import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
 import static 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
 import static 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -56,6 +58,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class OAuthBearerLoginCallbackHandlerTest extends OAuthBearerTest {
+    @AfterEach
+    public void tearDown() throws Exception {
+        System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+    }
 
     @Test
     public void testHandleTokenCallback() throws Exception {
@@ -87,6 +93,7 @@ public class OAuthBearerLoginCallbackHandlerTest extends 
OAuthBearerTest {
     public void testHandleSaslExtensionsCallback() throws Exception {
         OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
         Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com";);
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
"http://www.example.com";);
         Map<String, Object> jaasConfig = new HashMap<>();
         jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
         jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
@@ -116,6 +123,7 @@ public class OAuthBearerLoginCallbackHandlerTest extends 
OAuthBearerTest {
 
         OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
         Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com";);
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
"http://www.example.com";);
         Map<String, Object> jaasConfig = new HashMap<>();
         jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
         jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
@@ -212,6 +220,7 @@ public class OAuthBearerLoginCallbackHandlerTest extends 
OAuthBearerTest {
 
         File tmpDir = createTempDir("access-token");
         File accessTokenFile = createTempFile(tmpDir, "access-token-", 
".json", expected);
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
accessTokenFile.toURI().toString());
 
         OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
         Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
accessTokenFile.toURI().toString());
@@ -224,6 +233,7 @@ public class OAuthBearerLoginCallbackHandlerTest extends 
OAuthBearerTest {
     public void testConfigureWithAccessClientCredentials() {
         OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
         Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com";);
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
"http://www.example.com";);
         Map<String, Object> jaasConfigs = new HashMap<>();
         jaasConfigs.put(CLIENT_ID_CONFIG, "an ID");
         jaasConfigs.put(CLIENT_SECRET_CONFIG, "a secret");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
index 478e2baba1d..3e85f7b0ce4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
@@ -19,6 +19,7 @@ package 
org.apache.kafka.common.security.oauthbearer.internals.secured;
 
 import org.apache.kafka.common.config.ConfigException;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -32,10 +33,16 @@ import java.util.stream.Stream;
 import static 
org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
 import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
 import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class AccessTokenRetrieverFactoryTest extends OAuthBearerTest {
 
+    @AfterEach
+    public void tearDown() throws Exception {
+        System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+    }
+
     @Test
     public void testConfigureRefreshingFileAccessTokenRetriever() throws 
Exception {
         String expected = "{}";
@@ -43,6 +50,7 @@ public class AccessTokenRetrieverFactoryTest extends 
OAuthBearerTest {
         File tmpDir = createTempDir("access-token");
         File accessTokenFile = createTempFile(tmpDir, "access-token-", 
".json", expected);
 
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
accessTokenFile.toURI().toString());
         Map<String, ?> configs = 
Collections.singletonMap(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
accessTokenFile.toURI().toString());
         Map<String, Object> jaasConfig = Collections.emptyMap();
 
@@ -55,21 +63,34 @@ public class AccessTokenRetrieverFactoryTest extends 
OAuthBearerTest {
     @Test
     public void 
testConfigureRefreshingFileAccessTokenRetrieverWithInvalidDirectory() {
         // Should fail because the parent path doesn't exist.
-        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, new 
File("/tmp/this-directory-does-not-exist/foo.json").toURI().toString());
+        String file = new 
File("/tmp/this-directory-does-not-exist/foo.json").toURI().toString();
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, file);
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, file);
         Map<String, Object> jaasConfig = Collections.emptyMap();
         assertThrowsWithMessage(ConfigException.class, () -> 
AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
     }
 
     @Test
     public void 
testConfigureRefreshingFileAccessTokenRetrieverWithInvalidFile() throws 
Exception {
-        // Should fail because the while the parent path exists, the file 
itself doesn't.
+        // Should fail because while the parent path exists, the file itself 
doesn't.
         File tmpDir = createTempDir("this-directory-does-exist");
         File accessTokenFile = new File(tmpDir, 
"this-file-does-not-exist.json");
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
accessTokenFile.toURI().toString());
         Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
accessTokenFile.toURI().toString());
         Map<String, Object> jaasConfig = Collections.emptyMap();
         assertThrowsWithMessage(ConfigException.class, () -> 
AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
     }
 
+    @Test
+    public void testSaslOauthbearerTokenEndpointUrlIsNotAllowed() throws 
Exception {
+        // Should fail if the URL is not allowed
+        File tmpDir = createTempDir("not_allowed");
+        File accessTokenFile = new File(tmpDir, "not_allowed.json");
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
accessTokenFile.toURI().toString());
+        assertThrowsWithMessage(ConfigException.class, () -> 
AccessTokenRetrieverFactory.create(configs, Collections.emptyMap()),
+                ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+    }
+
     @ParameterizedTest
     @MethodSource("urlencodeHeaderSupplier")
     public void testUrlencodeHeader(Map<String, Object> configs, boolean 
expectedValue) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
index e2a8ba135c0..9a62f480215 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtilsTest.java
@@ -20,16 +20,27 @@ package 
org.apache.kafka.common.security.oauthbearer.internals.secured;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.test.TestUtils;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
 public class ConfigurationUtilsTest extends OAuthBearerTest {
 
     private static final String URL_CONFIG_NAME = "url";
+    private static final String FILE_CONFIG_NAME = "file";
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+    }
 
     @Test
     public void testUrl() {
@@ -82,6 +93,7 @@ public class ConfigurationUtilsTest extends OAuthBearerTest {
     }
 
     private void testUrl(String value) {
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, value == null 
? "" : value);
         Map<String, Object> configs = 
Collections.singletonMap(URL_CONFIG_NAME, value);
         ConfigurationUtils cu = new ConfigurationUtils(configs);
         cu.validateUrl(URL_CONFIG_NAME);
@@ -129,7 +141,35 @@ public class ConfigurationUtilsTest extends 
OAuthBearerTest {
         assertThrowsWithMessage(ConfigException.class, () -> testFile("    "), 
"must not contain only whitespace");
     }
 
+    @Test
+    public void testThrowIfURLIsNotAllowed() {
+        String url = "http://www.example.com";;
+        String fileUrl = "file:///etc/passwd";
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(URL_CONFIG_NAME, url);
+        configs.put(FILE_CONFIG_NAME, fileUrl);
+        ConfigurationUtils cu = new ConfigurationUtils(configs);
+
+        // By default, no URL is allowed
+        assertThrowsWithMessage(ConfigException.class, () -> 
cu.throwIfURLIsNotAllowed(url),
+                ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+        assertThrowsWithMessage(ConfigException.class, () -> 
cu.throwIfURLIsNotAllowed(fileUrl),
+                ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+
+        // add one url into allowed list
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, url);
+        assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(url));
+        assertThrowsWithMessage(ConfigException.class, () -> 
cu.throwIfURLIsNotAllowed(fileUrl),
+                ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+
+        // add all urls into allowed list
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, url + "," + 
fileUrl);
+        assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(url));
+        assertDoesNotThrow(() -> cu.throwIfURLIsNotAllowed(fileUrl));
+    }
+
     protected void testFile(String value) {
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, value == null 
? "" : value);
         Map<String, Object> configs = 
Collections.singletonMap(URL_CONFIG_NAME, value);
         ConfigurationUtils cu = new ConfigurationUtils(configs);
         cu.validateFile(URL_CONFIG_NAME);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactoryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactoryTest.java
new file mode 100644
index 00000000000..c2324b9d2da
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactoryTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
+
+public class VerificationKeyResolverFactoryTest extends OAuthBearerTest {
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        System.clearProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+    }
+
+    @Test
+    public void testConfigureRefreshingFileVerificationKeyResolver() throws 
Exception {
+        File tmpDir = createTempDir("access-token");
+        File verificationKeyFile = createTempFile(tmpDir, "access-token-", 
".json", "{}");
+
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
verificationKeyFile.toURI().toString());
+        Map<String, ?> configs = 
Collections.singletonMap(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, 
verificationKeyFile.toURI().toString());
+        Map<String, Object> jaasConfig = Collections.emptyMap();
+
+        // verify it won't throw exception
+        try (CloseableVerificationKeyResolver verificationKeyResolver = 
VerificationKeyResolverFactory.create(configs, jaasConfig)) { }
+    }
+
+    @Test
+    public void 
testConfigureRefreshingFileVerificationKeyResolverWithInvalidDirectory() {
+        // Should fail because the parent path doesn't exist.
+        String file = new 
File("/tmp/this-directory-does-not-exist/foo.json").toURI().toString();
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, file);
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, file);
+        Map<String, Object> jaasConfig = Collections.emptyMap();
+        assertThrowsWithMessage(ConfigException.class, () -> 
VerificationKeyResolverFactory.create(configs, jaasConfig), "that doesn't 
exist");
+    }
+
+    @Test
+    public void 
testConfigureRefreshingFileVerificationKeyResolverWithInvalidFile() throws 
Exception {
+        // Should fail because while the parent path exists, the file itself 
doesn't.
+        File tmpDir = createTempDir("this-directory-does-exist");
+        File verificationKeyFile = new File(tmpDir, 
"this-file-does-not-exist.json");
+        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, 
verificationKeyFile.toURI().toString());
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, 
verificationKeyFile.toURI().toString());
+        Map<String, Object> jaasConfig = Collections.emptyMap();
+        assertThrowsWithMessage(ConfigException.class, () -> 
VerificationKeyResolverFactory.create(configs, jaasConfig), "that doesn't 
exist");
+    }
+
+    @Test
+    public void testSaslOauthbearerTokenEndpointUrlIsNotAllowed() throws 
Exception {
+        // Should fail if the URL is not allowed
+        File tmpDir = createTempDir("not_allowed");
+        File verificationKeyFile = new File(tmpDir, "not_allowed.json");
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, 
verificationKeyFile.toURI().toString());
+        assertThrowsWithMessage(ConfigException.class, () -> 
VerificationKeyResolverFactory.create(configs, Collections.emptyMap()),
+                ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
+    }
+}

Reply via email to